diff --git a/ethcore/light/src/net/error.rs b/ethcore/light/src/net/error.rs index 86dbd54ba..c9ba85a42 100644 --- a/ethcore/light/src/net/error.rs +++ b/ethcore/light/src/net/error.rs @@ -56,6 +56,8 @@ pub enum Error { UnknownPeer, /// Unsolicited response. UnsolicitedResponse, + /// Not a server. + NotServer, } impl Error { @@ -70,6 +72,7 @@ impl Error { Error::WrongNetwork => Punishment::Disable, Error::UnknownPeer => Punishment::Disconnect, Error::UnsolicitedResponse => Punishment::Disable, + Error::NotServer => Punishment::Disable, } } } @@ -97,6 +100,7 @@ impl fmt::Display for Error { Error::WrongNetwork => write!(f, "Wrong network"), Error::UnknownPeer => write!(f, "Unknown peer"), Error::UnsolicitedResponse => write!(f, "Peer provided unsolicited data"), + Error::NotServer => write!(f, "Peer not a server."), } } } \ No newline at end of file diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 0eda7b77c..1eb89a92f 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -116,10 +116,9 @@ struct PendingPeer { // data about each peer. struct Peer { local_buffer: Buffer, // their buffer relative to us - remote_buffer: Buffer, // our buffer relative to them status: Status, capabilities: Capabilities, - remote_flow: FlowParams, + remote_flow: Option<(Buffer, FlowParams)>, sent_head: H256, // last head we've given them. last_update: SteadyTime, } @@ -142,12 +141,6 @@ impl Peer { self.local_buffer.current() } - - // recharge remote buffer with remote flow params. - fn recharge_remote(&mut self) { - let flow = &mut self.remote_flow; - flow.recharge(&mut self.remote_buffer); - } } /// An LES event handler. @@ -250,16 +243,21 @@ impl LightProtocol { /// Check the maximum amount of requests of a specific type /// which a peer would be able to serve. pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> Option { - self.peers.read().get(&peer).map(|peer| { + self.peers.read().get(&peer).and_then(|peer| { let mut peer = peer.lock(); - peer.recharge_remote(); - peer.remote_flow.max_amount(&peer.remote_buffer, kind) + match peer.remote_flow.as_mut() { + Some(&mut (ref mut buf, ref flow)) => { + flow.recharge(buf); + Some(flow.max_amount(&*buf, kind)) + } + None => None, + } }) } /// Make a request to a peer. /// - /// Fails on: nonexistent peer, network error, + /// Fails on: nonexistent peer, network error, peer not server, /// insufficient buffer. Does not check capabilities before sending. /// On success, returns a request id which can later be coordinated /// with an event. @@ -268,10 +266,14 @@ impl LightProtocol { let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer)); let mut peer = peer.lock(); - peer.recharge_remote(); - - let max = peer.remote_flow.compute_cost(request.kind(), request.amount()); - try!(peer.remote_buffer.deduct_cost(max)); + match peer.remote_flow.as_mut() { + Some(&mut (ref mut buf, ref flow)) => { + flow.recharge(buf); + let max = flow.compute_cost(request.kind(), request.amount()); + try!(buf.deduct_cost(max)); + } + None => return Err(Error::NotServer), + } let req_id = self.req_id.fetch_add(1, Ordering::SeqCst); let packet_data = encode_request(&request, req_id); @@ -390,8 +392,13 @@ impl LightProtocol { match peers.get(peer) { Some(peer_info) => { let mut peer_info = peer_info.lock(); - let actual_buffer = ::std::cmp::min(cur_buffer, *peer_info.remote_flow.limit()); - peer_info.remote_buffer.update_to(actual_buffer); + match peer_info.remote_flow.as_mut() { + Some(&mut (ref mut buf, ref mut flow)) => { + let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit()); + buf.update_to(actual_buffer) + } + None => return Err(Error::NotServer), // this really should be impossible. + } Ok(ReqId(req_id)) } None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind. @@ -516,7 +523,7 @@ impl LightProtocol { }; let capabilities = self.capabilities.read().clone(); - let status_packet = status::write_handshake(&status, &capabilities, &self.flow_params); + let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params)); io.send(peer, packet::STATUS, status_packet); @@ -543,12 +550,13 @@ impl LightProtocol { return Err(Error::WrongNetwork); } + let remote_flow = flow_params.map(|params| (params.create_buffer(), params)); + self.peers.write().insert(*peer, Mutex::new(Peer { local_buffer: self.flow_params.create_buffer(), - remote_buffer: flow_params.create_buffer(), status: status.clone(), capabilities: capabilities.clone(), - remote_flow: flow_params, + remote_flow: remote_flow, sent_head: pending.sent_head, last_update: pending.last_update, })); diff --git a/ethcore/light/src/net/status.rs b/ethcore/light/src/net/status.rs index eb80fbe44..59981b88d 100644 --- a/ethcore/light/src/net/status.rs +++ b/ethcore/light/src/net/status.rs @@ -198,7 +198,7 @@ impl Capabilities { /// - chain status /// - serving capabilities /// - buffer flow parameters -pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowParams), DecoderError> { +pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, Option), DecoderError> { let mut parser = Parser { pos: 0, rlp: rlp, @@ -221,17 +221,20 @@ pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowP tx_relay: parser.expect_raw(Key::TxRelay).is_ok(), }; - let flow_params = FlowParams::new( - try!(parser.expect(Key::BufferLimit)), - try!(parser.expect(Key::BufferCostTable)), - try!(parser.expect(Key::BufferRechargeRate)), - ); + let flow_params = match ( + parser.expect(Key::BufferLimit), + parser.expect(Key::BufferCostTable), + parser.expect(Key::BufferRechargeRate) + ) { + (Ok(bl), Ok(bct), Ok(brr)) => Some(FlowParams::new(bl, bct, brr)), + _ => None, + }; Ok((status, capabilities, flow_params)) } /// Write a handshake, given status, capabilities, and flow parameters. -pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params: &FlowParams) -> Vec { +pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params: Option<&FlowParams>) -> Vec { let mut pairs = Vec::new(); pairs.push(encode_pair(Key::ProtocolVersion, &status.protocol_version)); pairs.push(encode_pair(Key::NetworkId, &(status.network_id as u64))); @@ -253,9 +256,11 @@ pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params pairs.push(encode_flag(Key::TxRelay)); } - pairs.push(encode_pair(Key::BufferLimit, flow_params.limit())); - pairs.push(encode_pair(Key::BufferCostTable, flow_params.cost_table())); - pairs.push(encode_pair(Key::BufferRechargeRate, flow_params.recharge_rate())); + if let Some(flow_params) = flow_params { + pairs.push(encode_pair(Key::BufferLimit, flow_params.limit())); + pairs.push(encode_pair(Key::BufferCostTable, flow_params.cost_table())); + pairs.push(encode_pair(Key::BufferRechargeRate, flow_params.recharge_rate())); + } let mut stream = RlpStream::new_list(pairs.len()); @@ -386,14 +391,14 @@ mod tests { 1000.into(), ); - let handshake = write_handshake(&status, &capabilities, &flow_params); + let handshake = write_handshake(&status, &capabilities, Some(&flow_params)); let (read_status, read_capabilities, read_flow) = parse_handshake(UntrustedRlp::new(&handshake)).unwrap(); assert_eq!(read_status, status); assert_eq!(read_capabilities, capabilities); - assert_eq!(read_flow, flow_params); + assert_eq!(read_flow.unwrap(), flow_params); } #[test] @@ -421,14 +426,14 @@ mod tests { 1000.into(), ); - let handshake = write_handshake(&status, &capabilities, &flow_params); + let handshake = write_handshake(&status, &capabilities, Some(&flow_params)); let (read_status, read_capabilities, read_flow) = parse_handshake(UntrustedRlp::new(&handshake)).unwrap(); assert_eq!(read_status, status); assert_eq!(read_capabilities, capabilities); - assert_eq!(read_flow, flow_params); + assert_eq!(read_flow.unwrap(), flow_params); } #[test] @@ -456,7 +461,7 @@ mod tests { 1000.into(), ); - let handshake = write_handshake(&status, &capabilities, &flow_params); + let handshake = write_handshake(&status, &capabilities, Some(&flow_params)); let interleaved = { let handshake = UntrustedRlp::new(&handshake); let mut stream = RlpStream::new_list(handshake.item_count() * 3); @@ -478,7 +483,7 @@ mod tests { assert_eq!(read_status, status); assert_eq!(read_capabilities, capabilities); - assert_eq!(read_flow, flow_params); + assert_eq!(read_flow.unwrap(), flow_params); } #[test] @@ -528,4 +533,33 @@ mod tests { let out = stream.drain(); assert!(parse_announcement(UntrustedRlp::new(&out)).is_ok()); } + + #[test] + fn optional_flow() { + let status = Status { + protocol_version: 1, + network_id: 1, + head_td: U256::default(), + head_hash: H256::default(), + head_num: 10, + genesis_hash: H256::zero(), + last_head: None, + }; + + let capabilities = Capabilities { + serve_headers: true, + serve_chain_since: Some(5), + serve_state_since: Some(8), + tx_relay: true, + }; + + let handshake = write_handshake(&status, &capabilities, None); + + let (read_status, read_capabilities, read_flow) + = parse_handshake(UntrustedRlp::new(&handshake)).unwrap(); + + assert_eq!(read_status, status); + assert_eq!(read_capabilities, capabilities); + assert!(read_flow.is_none()); + } } \ No newline at end of file diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index 30ab2bab2..7a159b6a4 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -204,7 +204,7 @@ fn handshake_expected() { let status = status(provider.client.chain_info()); - let packet_body = write_handshake(&status, &capabilities, &flow_params); + let packet_body = write_handshake(&status, &capabilities, Some(&flow_params)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); } @@ -220,7 +220,7 @@ fn genesis_mismatch() { let mut status = status(provider.client.chain_info()); status.genesis_hash = H256::default(); - let packet_body = write_handshake(&status, &capabilities, &flow_params); + let packet_body = write_handshake(&status, &capabilities, Some(&flow_params)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); } @@ -235,12 +235,12 @@ fn buffer_overflow() { let status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&status, &capabilities, &flow_params); + let packet_body = write_handshake(&status, &capabilities, Some(&flow_params)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); } { - let my_status = write_handshake(&status, &capabilities, &flow_params); + let my_status = write_handshake(&status, &capabilities, Some(&flow_params)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -267,14 +267,14 @@ fn get_block_headers() { let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); - let my_status = write_handshake(&cur_status, &capabilities, &flow_params); + let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); provider.client.add_blocks(100, EachBlockWith::Nothing); let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, &flow_params); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -317,14 +317,14 @@ fn get_block_bodies() { let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); - let my_status = write_handshake(&cur_status, &capabilities, &flow_params); + let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); provider.client.add_blocks(100, EachBlockWith::Nothing); let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, &flow_params); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -364,14 +364,14 @@ fn get_block_receipts() { let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); - let my_status = write_handshake(&cur_status, &capabilities, &flow_params); + let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); provider.client.add_blocks(1000, EachBlockWith::Nothing); let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, &flow_params); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -419,7 +419,7 @@ fn get_state_proofs() { let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, &flow_params); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); } @@ -468,7 +468,7 @@ fn get_contract_code() { let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, &flow_params); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); }