Abort support

This commit is contained in:
Tomasz Drwięga 2016-08-31 11:38:39 +02:00
parent 0f0af9c1a5
commit 8f13b550d8
5 changed files with 33 additions and 14 deletions

2
Cargo.lock generated
View File

@ -1758,7 +1758,7 @@ dependencies = [
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a"
"checksum lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "49247ec2a285bb3dcb23cbd9c35193c025e7251bfce77c1d5da97e6362dffe7f"
"checksum libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)" = "97def9dc7ce1d8e153e693e3a33020bc69972181adb2f871e87e888876feae49"
"checksum libc 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)" = "23e3757828fa702a20072c37ff47938e9dd331b92fac6e223d26d4b7a55f7ee2"
"checksum linked-hash-map 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6d262045c5b87c0861b3f004610afd0e2c851e2908d08b6c870cbb9d5f494ecd"
"checksum log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ab83497bf8bf4ed2a74259c1c802351fcd67a65baa86394b6ba73c36f4838054"
"checksum matches 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "15305656809ce5a4805b1ff2946892810992197ce1270ff79baded852187942e"

View File

@ -97,7 +97,7 @@ impl Fetch {
self.abort.load(Ordering::Relaxed)
}
fn mark_aborted(&mut self) -> Next {
self.result = Some(Err(Error::Aborted));
self.result = Some(Err(Error::Aborted.into()));
Next::end()
}
}

View File

@ -19,7 +19,8 @@
pub mod fetch_file;
use std::env;
use std::sync::mpsc;
use std::sync::{mpsc, Arc};
use std::sync::atomic::AtomicBool;
use std::path::PathBuf;
use hyper;
@ -62,7 +63,7 @@ impl Client {
self.https_client.close();
}
pub fn request(&mut self, url: String, on_done: Box<Fn() + Send>) -> Result<mpsc::Receiver<FetchResult>, FetchError> {
pub fn request(&mut self, url: String, abort: Arc<AtomicBool>, on_done: Box<Fn() + Send>) -> Result<mpsc::Receiver<FetchResult>, FetchError> {
let is_https = url.starts_with("https://");
let url = try!(url.parse().map_err(|_| FetchError::InvalidUrl));
trace!(target: "dapps", "Fetching from: {:?}", url);
@ -71,7 +72,7 @@ impl Client {
let (tx, rx) = mpsc::channel();
let temp_path = Self::temp_path();
let res = self.https_client.fetch_to_file(url, temp_path.clone(), move |result| {
let res = self.https_client.fetch_to_file(url, temp_path.clone(), abort, move |result| {
let res = tx.send(
result.map(|_| temp_path).map_err(FetchError::Https)
);
@ -87,7 +88,7 @@ impl Client {
}
} else {
let (tx, rx) = mpsc::channel();
let res = self.http_client.request(url, Fetch::new(tx, on_done));
let res = self.http_client.request(url, Fetch::new(tx, abort, on_done));
match res {
Ok(_) => Ok(rx),

View File

@ -16,6 +16,8 @@
use std::cell::RefCell;
use std::{fs, str, thread};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::path::PathBuf;
use std::io::{self, Write};
use std::collections::HashMap;
@ -56,7 +58,7 @@ impl From<TlsClientError> for FetchError {
pub type FetchResult = Result<(), FetchError>;
pub enum ClientMessage {
Fetch(Url, Box<io::Write + Send>, Box<FnMut(FetchResult) + Send>),
Fetch(Url, Box<io::Write + Send>, Arc<AtomicBool>, Box<FnMut(FetchResult) + Send>),
Shutdown,
}
@ -93,9 +95,9 @@ impl Client {
})
}
pub fn fetch_to_file<F: FnOnce(FetchResult) + Send + 'static>(&self, url: Url, path: PathBuf, callback: F) -> Result<(), FetchError> {
pub fn fetch_to_file<F: FnOnce(FetchResult) + Send + 'static>(&self, url: Url, path: PathBuf, abort: Arc<AtomicBool>, callback: F) -> Result<(), FetchError> {
let file = try!(fs::File::create(&path));
self.fetch(url, Box::new(file), move |result| {
self.fetch(url, Box::new(file), abort, move |result| {
if let Err(_) = result {
// remove temporary file
let _ = fs::remove_file(&path);
@ -104,9 +106,9 @@ impl Client {
})
}
pub fn fetch<F: FnOnce(FetchResult) + Send + 'static>(&self, url: Url, writer: Box<io::Write + Send>, callback: F) -> Result<(), FetchError> {
pub fn fetch<F: FnOnce(FetchResult) + Send + 'static>(&self, url: Url, writer: Box<io::Write + Send>, abort: Arc<AtomicBool>, callback: F) -> Result<(), FetchError> {
let cell = RefCell::new(Some(callback));
try!(self.channel.send(ClientMessage::Fetch(url, writer, Box::new(move |res| {
try!(self.channel.send(ClientMessage::Fetch(url, writer, abort, Box::new(move |res| {
cell.borrow_mut().take().expect("Called only once.")(res);
}))));
Ok(())
@ -148,11 +150,11 @@ impl mio::Handler for ClientLoop {
fn notify(&mut self, event_loop: &mut mio::EventLoop<Self>, msg: Self::Message) {
match msg {
ClientMessage::Shutdown => event_loop.shutdown(),
ClientMessage::Fetch(url, writer, callback) => {
ClientMessage::Fetch(url, writer, abort, callback) => {
let token = self.next_token;
self.next_token += 1;
if let Ok(mut tlsclient) = TlsClient::new(mio::Token(token), &url, writer, callback) {
if let Ok(mut tlsclient) = TlsClient::new(mio::Token(token), &url, writer, abort, callback) {
let httpreq = format!(
"GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\nAccept-Encoding: identity\r\n\r\n",
url.path(),

View File

@ -16,6 +16,7 @@
use std::str;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::io::{self, Write, Read, Cursor, BufReader};
use mio;
@ -28,6 +29,7 @@ use client::{FetchError, ClientLoop, FetchResult};
#[derive(Debug)]
pub enum TlsClientError {
Aborted,
Initialization,
UnexpectedEof,
Connection(io::Error),
@ -38,6 +40,7 @@ pub enum TlsClientError {
/// This encapsulates the TCP-level connection, some connection
/// state, and the underlying TLS-level session.
pub struct TlsClient {
abort: Arc<AtomicBool>,
token: mio::Token,
socket: TcpStream,
tls_session: rustls::ClientSession,
@ -81,6 +84,7 @@ impl TlsClient {
token: mio::Token,
url: &Url,
writer: Box<io::Write + Send>,
abort: Arc<AtomicBool>,
mut callback: Box<FnMut(FetchResult) + Send>,
) -> Result<Self, FetchError> {
let res = TlsClient::make_config().and_then(|cfg| {
@ -91,6 +95,7 @@ impl TlsClient {
match res {
Ok((cfg, sock)) => Ok(TlsClient {
abort: abort,
token: token,
writer: HttpProcessor::new(writer),
socket: sock,
@ -111,6 +116,13 @@ impl TlsClient {
pub fn ready(&mut self, event_loop: &mut mio::EventLoop<ClientLoop>, token: mio::Token, events: mio::EventSet) -> bool {
assert_eq!(token, self.token);
let aborted = self.is_aborted();
if aborted {
// do_write needs to be invoked after that
self.tls_session.send_close_notify();
self.error = Some(TlsClientError::Aborted);
}
if events.is_readable() {
self.do_read();
}
@ -119,7 +131,7 @@ impl TlsClient {
self.do_write();
}
if self.is_closed() {
if self.is_closed() || aborted {
trace!("Connection closed");
let callback = &mut self.callback;
callback(match self.error.take() {
@ -226,5 +238,9 @@ impl TlsClient {
fn is_closed(&self) -> bool {
self.closing
}
fn is_aborted(&self) -> bool {
self.abort.load(Ordering::Relaxed)
}
}