diff --git a/ipfs/src/handler.rs b/ipfs/src/handler.rs index 7dd83da47..5197e8a1a 100644 --- a/ipfs/src/handler.rs +++ b/ipfs/src/handler.rs @@ -36,23 +36,25 @@ pub enum Out { /// Request/response handler pub struct IpfsHandler { + /// Reference to the Blockchain Client client: Arc, - out: Out, + + /// Response to send out + pub out: Out, + + /// How many bytes from the response have been written + pub out_progress: usize, } impl IpfsHandler { pub fn new(client: Arc) -> Self { IpfsHandler { client: client, - out: Out::Bad("Invalid Request") + out: Out::Bad("Invalid Request"), + out_progress: 0, } } - /// Exposes the outgoing state. The outgoing state should be immutable from the outside. - pub fn out(&self) -> &Out { - &self.out - } - /// Route path + query string to a specialized method pub fn route(&mut self, path: &str, query: Option<&str>) -> Next { self.out = match path { @@ -216,7 +218,7 @@ mod tests { let _ = handler.route("/api/v0/block/get", Some("arg=z43AaGF5tmkT9SEX6urrhwpEW5ZSaACY73Vw357ZXTsur2fR8BM")); - assert_eq!(handler.out(), &Out::NotFound("Block not found")); + assert_eq!(handler.out, Out::NotFound("Block not found")); } #[test] @@ -225,7 +227,7 @@ mod tests { let _ = handler.route("/api/v0/block/get", None); - assert_eq!(handler.out(), &Out::Bad("CID parsing failed")); + assert_eq!(handler.out, Out::Bad("CID parsing failed")); } #[test] @@ -234,7 +236,7 @@ mod tests { let _ = handler.route("/api/v0/block/get", Some("arg=foobarz43AaGF5tmkT9SEX6urrhwpEW5ZSaACY73Vw357ZXTsur2fR8BM")); - assert_eq!(handler.out(), &Out::Bad("CID parsing failed")); + assert_eq!(handler.out, Out::Bad("CID parsing failed")); } #[test] @@ -243,6 +245,6 @@ mod tests { let _ = handler.route("/foo/bar/baz", Some("arg=z43AaGF5tmkT9SEX6urrhwpEW5ZSaACY73Vw357ZXTsur2fR8BM")); - assert_eq!(handler.out(), &Out::NotFound("Route not found")); + assert_eq!(handler.out, Out::NotFound("Route not found")); } } diff --git a/ipfs/src/lib.rs b/ipfs/src/lib.rs index 776dfe85d..5c95fe88e 100644 --- a/ipfs/src/lib.rs +++ b/ipfs/src/lib.rs @@ -27,6 +27,7 @@ extern crate ethcore_util as util; mod error; mod handler; +use std::io::Write; use std::sync::Arc; use error::ServerError; use handler::{IpfsHandler, Out}; @@ -58,7 +59,7 @@ impl Handler for IpfsHandler { fn on_response(&mut self, res: &mut Response) -> Next { use Out::*; - match *self.out() { + match self.out { OctetStream(ref bytes) => { use mime::{Mime, TopLevel, SubLevel}; @@ -97,20 +98,33 @@ impl Handler for IpfsHandler { fn on_response_writable(&mut self, transport: &mut Encoder) -> Next { use Out::*; - match *self.out() { - OctetStream(ref bytes) => { - // Nothing to do here - let _ = transport.write(&bytes); + // Get the data to write as a byte slice + let data = match self.out { + OctetStream(ref bytes) => &bytes, + NotFound(reason) | Bad(reason) => reason.as_bytes(), + }; - Next::end() - }, - NotFound(reason) | Bad(reason) => { - // Nothing to do here - let _ = transport.write(reason.as_bytes()); + write_chunk(transport, &mut self.out_progress, data) + } +} - Next::end() - } - } +fn write_chunk(transport: &mut W, progress: &mut usize, data: &[u8]) -> Next { + // Skip any bytes that have already been written + let chunk = &data[*progress..]; + + // Write an get written count + let written = match transport.write(chunk) { + Ok(written) => written, + Err(_) => return Next::end(), + }; + + *progress += written; + + // Close the connection if the entire chunk has been written, otherwise increment progress + if written < chunk.len() { + Next::write() + } else { + Next::end() } } @@ -130,3 +144,54 @@ pub fn start_server(client: Arc) -> Result = Cursor::new(&mut buf); + let _ = write_chunk(&mut transport, &mut progress, b"foobar"); + } + + assert_eq!(*b"foo", buf); + assert_eq!(3, progress); + + { + let mut transport: Cursor<&mut [u8]> = Cursor::new(&mut buf); + let _ = write_chunk(&mut transport, &mut progress, b"foobar"); + } + + assert_eq!(*b"bar", buf); + assert_eq!(6, progress); + } +}