Write output as chunks

This commit is contained in:
maciejhirsz 2017-02-15 19:25:57 +01:00
parent d005410e1a
commit 9cfa27830c
2 changed files with 91 additions and 24 deletions

View File

@ -36,23 +36,25 @@ pub enum Out {
/// Request/response handler /// Request/response handler
pub struct IpfsHandler { pub struct IpfsHandler {
/// Reference to the Blockchain Client
client: Arc<BlockChainClient>, client: Arc<BlockChainClient>,
out: Out,
/// Response to send out
pub out: Out,
/// How many bytes from the response have been written
pub out_progress: usize,
} }
impl IpfsHandler { impl IpfsHandler {
pub fn new(client: Arc<BlockChainClient>) -> Self { pub fn new(client: Arc<BlockChainClient>) -> Self {
IpfsHandler { IpfsHandler {
client: client, client: client,
out: Out::Bad("Invalid Request") out: Out::Bad("Invalid Request"),
out_progress: 0,
} }
} }
/// Exposes the outgoing state. The outgoing state should be immutable from the outside.
pub fn out(&self) -> &Out {
&self.out
}
/// Route path + query string to a specialized method /// Route path + query string to a specialized method
pub fn route(&mut self, path: &str, query: Option<&str>) -> Next { pub fn route(&mut self, path: &str, query: Option<&str>) -> Next {
self.out = match path { self.out = match path {
@ -216,7 +218,7 @@ mod tests {
let _ = handler.route("/api/v0/block/get", Some("arg=z43AaGF5tmkT9SEX6urrhwpEW5ZSaACY73Vw357ZXTsur2fR8BM")); let _ = handler.route("/api/v0/block/get", Some("arg=z43AaGF5tmkT9SEX6urrhwpEW5ZSaACY73Vw357ZXTsur2fR8BM"));
assert_eq!(handler.out(), &Out::NotFound("Block not found")); assert_eq!(handler.out, Out::NotFound("Block not found"));
} }
#[test] #[test]
@ -225,7 +227,7 @@ mod tests {
let _ = handler.route("/api/v0/block/get", None); let _ = handler.route("/api/v0/block/get", None);
assert_eq!(handler.out(), &Out::Bad("CID parsing failed")); assert_eq!(handler.out, Out::Bad("CID parsing failed"));
} }
#[test] #[test]
@ -234,7 +236,7 @@ mod tests {
let _ = handler.route("/api/v0/block/get", Some("arg=foobarz43AaGF5tmkT9SEX6urrhwpEW5ZSaACY73Vw357ZXTsur2fR8BM")); let _ = handler.route("/api/v0/block/get", Some("arg=foobarz43AaGF5tmkT9SEX6urrhwpEW5ZSaACY73Vw357ZXTsur2fR8BM"));
assert_eq!(handler.out(), &Out::Bad("CID parsing failed")); assert_eq!(handler.out, Out::Bad("CID parsing failed"));
} }
#[test] #[test]
@ -243,6 +245,6 @@ mod tests {
let _ = handler.route("/foo/bar/baz", Some("arg=z43AaGF5tmkT9SEX6urrhwpEW5ZSaACY73Vw357ZXTsur2fR8BM")); let _ = handler.route("/foo/bar/baz", Some("arg=z43AaGF5tmkT9SEX6urrhwpEW5ZSaACY73Vw357ZXTsur2fR8BM"));
assert_eq!(handler.out(), &Out::NotFound("Route not found")); assert_eq!(handler.out, Out::NotFound("Route not found"));
} }
} }

View File

@ -27,6 +27,7 @@ extern crate ethcore_util as util;
mod error; mod error;
mod handler; mod handler;
use std::io::Write;
use std::sync::Arc; use std::sync::Arc;
use error::ServerError; use error::ServerError;
use handler::{IpfsHandler, Out}; use handler::{IpfsHandler, Out};
@ -58,7 +59,7 @@ impl Handler<HttpStream> for IpfsHandler {
fn on_response(&mut self, res: &mut Response) -> Next { fn on_response(&mut self, res: &mut Response) -> Next {
use Out::*; use Out::*;
match *self.out() { match self.out {
OctetStream(ref bytes) => { OctetStream(ref bytes) => {
use mime::{Mime, TopLevel, SubLevel}; use mime::{Mime, TopLevel, SubLevel};
@ -97,20 +98,33 @@ impl Handler<HttpStream> for IpfsHandler {
fn on_response_writable(&mut self, transport: &mut Encoder<HttpStream>) -> Next { fn on_response_writable(&mut self, transport: &mut Encoder<HttpStream>) -> Next {
use Out::*; use Out::*;
match *self.out() { // Get the data to write as a byte slice
OctetStream(ref bytes) => { let data = match self.out {
// Nothing to do here OctetStream(ref bytes) => &bytes,
let _ = transport.write(&bytes); NotFound(reason) | Bad(reason) => reason.as_bytes(),
};
Next::end() write_chunk(transport, &mut self.out_progress, data)
}, }
NotFound(reason) | Bad(reason) => { }
// Nothing to do here
let _ = transport.write(reason.as_bytes());
Next::end() fn write_chunk<W: Write>(transport: &mut W, progress: &mut usize, data: &[u8]) -> Next {
} // Skip any bytes that have already been written
} let chunk = &data[*progress..];
// Write an get written count
let written = match transport.write(chunk) {
Ok(written) => written,
Err(_) => return Next::end(),
};
*progress += written;
// Close the connection if the entire chunk has been written, otherwise increment progress
if written < chunk.len() {
Next::write()
} else {
Next::end()
} }
} }
@ -130,3 +144,54 @@ pub fn start_server(client: Arc<BlockChainClient>) -> Result<Listening, ServerEr
})? })?
) )
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn write_chunk_to_vec() {
let mut transport = Vec::new();
let mut progress = 0;
let _ = write_chunk(&mut transport, &mut progress, b"foobar");
assert_eq!(b"foobar".to_vec(), transport);
assert_eq!(6, progress);
}
#[test]
fn write_chunk_to_vec_part() {
let mut transport = Vec::new();
let mut progress = 3;
let _ = write_chunk(&mut transport, &mut progress, b"foobar");
assert_eq!(b"bar".to_vec(), transport);
assert_eq!(6, progress);
}
#[test]
fn write_chunk_to_array() {
use std::io::Cursor;
let mut buf = [0u8; 3];
let mut progress = 0;
{
let mut transport: Cursor<&mut [u8]> = Cursor::new(&mut buf);
let _ = write_chunk(&mut transport, &mut progress, b"foobar");
}
assert_eq!(*b"foo", buf);
assert_eq!(3, progress);
{
let mut transport: Cursor<&mut [u8]> = Cursor::new(&mut buf);
let _ = write_chunk(&mut transport, &mut progress, b"foobar");
}
assert_eq!(*b"bar", buf);
assert_eq!(6, progress);
}
}