diff --git a/ethcore/light/src/net/buffer_flow.rs b/ethcore/light/src/net/buffer_flow.rs index deed1cded..7ee287533 100644 --- a/ethcore/light/src/net/buffer_flow.rs +++ b/ethcore/light/src/net/buffer_flow.rs @@ -171,7 +171,7 @@ pub struct FlowParams { impl FlowParams { /// Create new flow parameters from a request cost table, /// buffer limit, and (minimum) rate of recharge. - pub fn new(costs: CostTable, limit: U256, recharge: U256) -> Self { + pub fn new(limit: U256, costs: CostTable, recharge: U256) -> Self { FlowParams { costs: costs, limit: limit, @@ -179,6 +179,15 @@ impl FlowParams { } } + /// Get a reference to the buffer limit. + pub fn limit(&self) -> &U256 { &self.limit } + + /// Get a reference to the cost table. + pub fn cost_table(&self) -> &CostTable { &self.costs } + + /// Get a reference to the recharge rate. + pub fn recharge_rate(&self) -> &U256 { &self.recharge } + /// Estimate the maximum cost of the request. pub fn max_cost(&self, req: &Request) -> U256 { let amount = match *req { @@ -253,7 +262,7 @@ mod tests { use std::thread; use std::time::Duration; - let flow_params = FlowParams::new(Default::default(), 100.into(), 20.into()); + let flow_params = FlowParams::new(100.into(), Default::default(), 20.into()); let mut buffer = flow_params.create_buffer(); assert!(buffer.deduct_cost(101.into()).is_err()); diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index d7bcc4b39..4f0543526 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -97,7 +97,7 @@ pub struct LightProtocol { genesis_hash: H256, mainnet: bool, peers: RwLock>, - pending_requests: RwLock>, + pending_requests: RwLock>, req_id: AtomicUsize, } diff --git a/ethcore/light/src/net/status.rs b/ethcore/light/src/net/status.rs index d89c6ac79..23b8b68c0 100644 --- a/ethcore/light/src/net/status.rs +++ b/ethcore/light/src/net/status.rs @@ -16,9 +16,75 @@ //! Peer status and capabilities. -use rlp::{RlpStream, Stream, UntrustedRlp, View}; +use rlp::{DecoderError, RlpDecodable, RlpEncodable, RlpStream, Stream, UntrustedRlp, View}; use util::{H256, U256}; +use super::buffer_flow::{CostTable, FlowParams}; + +// recognized handshake/announcement keys. +// unknown keys are to be skipped, known keys have a defined order. +// their string values are defined in the LES spec. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum Key { + ProtocolVersion, + NetworkId, + HeadTD, + HeadHash, + HeadNum, + GenesisHash, + ServeHeaders, + ServeChainSince, + ServeStateSince, + TxRelay, + BufferLimit, + BufferCostTable, + BufferRechargeRate, +} + +impl Key { + // get the string value of this key. + fn as_str(&self) -> &'static str { + match *self { + Key::ProtocolVersion => "protocolVersion", + Key::NetworkId => "networkId", + Key::HeadTD => "headTd", + Key::HeadHash => "headHash", + Key::HeadNum => "headNum", + Key::GenesisHash => "genesisHash", + Key::ServeHeaders => "serveHeaders", + Key::ServeChainSince => "serveChainSince", + Key::ServeStateSince => "serveStateSince", + Key::TxRelay => "txRelay", + Key::BufferLimit => "flowControl/BL", + Key::BufferCostTable => "flowControl/MRC", + Key::BufferRechargeRate => "flowControl/MRR", + } + } + + fn from_str(s: &str) -> Option { + match s { + "protocolVersion" => Some(Key::ProtocolVersion), + "networkId" => Some(Key::NetworkId), + "headTd" => Some(Key::HeadTD), + "headHash" => Some(Key::HeadHash), + "headNum" => Some(Key::HeadNum), + "genesisHash" => Some(Key::GenesisHash), + "serveHeaders" => Some(Key::ServeHeaders), + "serveChainSince" => Some(Key::ServeChainSince), + "serveStateSince" => Some(Key::ServeStateSince), + "txRelay" => Some(Key::TxRelay), + "flowControl/BL" => Some(Key::BufferLimit), + "flowControl/MRC" => Some(Key::BufferCostTable), + "flowControl/MRR" => Some(Key::BufferRechargeRate), + _ => None + } + } + + fn is_recognized(s: &str) -> bool { + Key::from_str(s).is_some() + } +} + /// Network ID structure. #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(u32)] @@ -29,6 +95,66 @@ pub enum NetworkId { Testnet = 0, } +impl NetworkId { + fn from_raw(raw: u32) -> Option { + match raw { + 0 => Some(NetworkId::Testnet), + 1 => Some(NetworkId::Mainnet), + _ => None, + } + } +} + +// helper for decoding key-value pairs in the handshake or an announcement. +struct Parser<'a> { + pos: usize, + rlp: UntrustedRlp<'a>, +} + +impl<'a> Parser<'a> { + // attempt to parse the next key, value pair, and decode the value to the given type. + fn expect(&mut self, key: Key) -> Result { + self.expect_raw(key).and_then(|item| item.as_val()) + } + + // attempt to parse the next key, value pair, and returns the value's RLP. + fn expect_raw(&mut self, key: Key) -> Result, DecoderError> { + loop { + let pair = try!(self.rlp.at(self.pos)); + let k: String = try!(pair.val_at(0)); + let k = match Key::from_str(&k) { + Some(k) => k, + None => { + // skip any unrecognized keys. + self.pos += 1; + continue; + } + }; + + if k == key { + self.pos += 1; + return pair.at(1) + } else { + return Err(DecoderError::Custom("Missing expected key")) + } + } + } +} + +// Helper for encoding a key-value pair +fn encode_pair(key: Key, val: &T) -> Vec { + let mut s = RlpStream::new_list(2); + s.append(&key.as_str()).append(val); + s.out() +} + +// Helper for encoding a flag. +fn encode_flag(key: Key) -> Vec { + let mut s = RlpStream::new_list(2); + s.append(&key.as_str()).append_empty_data(); + s.out() +} + /// A peer status message. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Status { @@ -72,26 +198,80 @@ impl Default for Capabilities { } } -impl Capabilities { - /// Decode capabilities from the given rlp stream, starting from the given - /// index. - fn decode_from(rlp: &UntrustedRlp, start_idx: usize) -> Result { - let mut caps = Capabilities::default(); +/// Attempt to parse a handshake message into its three parts: +/// - chain status +/// - serving capabilities +/// - buffer flow parameters +pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowParams), DecoderError> { + let mut parser = Parser { + pos: 0, + rlp: rlp, + }; - for item in rlp.iter().skip(start_idx).take(4) { - let key: String = try!(item.val_at(0)); + let status = Status { + protocol_version: try!(parser.expect(Key::ProtocolVersion)), + network_id: try!(parser.expect(Key::NetworkId) + .and_then(|id: u32| NetworkId::from_raw(id).ok_or(DecoderError::Custom("Invalid network ID")))), + head_td: try!(parser.expect(Key::HeadTD)), + head_hash: try!(parser.expect(Key::HeadHash)), + head_num: try!(parser.expect(Key::HeadNum)), + genesis_hash: parser.expect(Key::GenesisHash).ok(), + last_head: None, + }; - match &*key { - "serveHeaders" => caps.serve_headers = true, - "serveChainSince" => caps.serve_chain_since = Some(try!(item.val_at(1))), - "serveStateSince" => caps.serve_state_since = Some(try!(item.val_at(1))), - "txRelay" => caps.tx_relay = true, - _ => continue, - } - } + let capabilities = Capabilities { + serve_headers: parser.expect_raw(Key::ServeHeaders).is_ok(), + serve_chain_since: parser.expect(Key::ServeChainSince).ok(), + serve_state_since: parser.expect(Key::ServeStateSince).ok(), + tx_relay: parser.expect_raw(Key::TxRelay).is_ok(), + }; - Ok(caps) + let flow_params = FlowParams::new( + try!(parser.expect(Key::BufferLimit)), + try!(parser.expect(Key::BufferCostTable)), + try!(parser.expect(Key::BufferRechargeRate)), + ); + + Ok((status, capabilities, flow_params)) +} + +/// Write a handshake, given status, capabilities, and flow parameters. +pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params: &FlowParams) -> Vec { + let mut pairs = Vec::new(); + pairs.push(encode_pair(Key::ProtocolVersion, &status.protocol_version)); + pairs.push(encode_pair(Key::NetworkId, &(status.network_id as u32))); + pairs.push(encode_pair(Key::HeadTD, &status.head_td)); + pairs.push(encode_pair(Key::HeadHash, &status.head_hash)); + pairs.push(encode_pair(Key::HeadNum, &status.head_num)); + + if let Some(ref genesis_hash) = status.genesis_hash { + pairs.push(encode_pair(Key::GenesisHash, genesis_hash)); } + + if capabilities.serve_headers { + pairs.push(encode_flag(Key::ServeHeaders)); + } + if let Some(ref serve_chain_since) = capabilities.serve_chain_since { + pairs.push(encode_pair(Key::ServeChainSince, serve_chain_since)); + } + if let Some(ref serve_state_since) = capabilities.serve_state_since { + pairs.push(encode_pair(Key::ServeStateSince, serve_state_since)); + } + if capabilities.tx_relay { + pairs.push(encode_flag(Key::TxRelay)); + } + + pairs.push(encode_pair(Key::BufferLimit, flow_params.limit())); + pairs.push(encode_pair(Key::BufferCostTable, flow_params.cost_table())); + pairs.push(encode_pair(Key::BufferRechargeRate, flow_params.recharge_rate())); + + let mut stream = RlpStream::new_list(pairs.len()); + + for pair in pairs { + stream.append_raw(&pair, 1); + } + + stream.out() } /// An announcement of new chain head or capabilities made by a peer. @@ -105,6 +285,137 @@ pub struct Announcement { head_td: U256, /// reorg depth to common ancestor of last announced head. reorg_depth: u64, - /// updated capabilities. - new_capabilities: Capabilities, + /// optional new state-serving capability + serve_state_since: Option, + /// optional new chain-serving capability + serve_chain_since: Option, + // TODO: changes in buffer flow? +} + +#[cfg(test)] +mod tests { + use super::*; + use super::super::buffer_flow::FlowParams; + use util::{U256, H256, FixedHash}; + use rlp::{RlpStream, Stream ,UntrustedRlp, View}; + + #[test] + fn full_handshake() { + let status = Status { + protocol_version: 1, + network_id: NetworkId::Mainnet, + head_td: U256::default(), + head_hash: H256::default(), + head_num: 10, + genesis_hash: Some(H256::zero()), + last_head: None, + }; + + let capabilities = Capabilities { + serve_headers: true, + serve_chain_since: Some(5), + serve_state_since: Some(8), + tx_relay: true, + }; + + let flow_params = FlowParams::new( + 1_000_000.into(), + Default::default(), + 1000.into(), + ); + + let handshake = write_handshake(&status, &capabilities, &flow_params); + + let (read_status, read_capabilities, read_flow) + = parse_handshake(UntrustedRlp::new(&handshake)).unwrap(); + + assert_eq!(read_status, status); + assert_eq!(read_capabilities, capabilities); + assert_eq!(read_flow, flow_params); + } + + #[test] + fn partial_handshake() { + let status = Status { + protocol_version: 1, + network_id: NetworkId::Mainnet, + head_td: U256::default(), + head_hash: H256::default(), + head_num: 10, + genesis_hash: None, + last_head: None, + }; + + let capabilities = Capabilities { + serve_headers: false, + serve_chain_since: Some(5), + serve_state_since: None, + tx_relay: true, + }; + + let flow_params = FlowParams::new( + 1_000_000.into(), + Default::default(), + 1000.into(), + ); + + let handshake = write_handshake(&status, &capabilities, &flow_params); + + let (read_status, read_capabilities, read_flow) + = parse_handshake(UntrustedRlp::new(&handshake)).unwrap(); + + assert_eq!(read_status, status); + assert_eq!(read_capabilities, capabilities); + assert_eq!(read_flow, flow_params); + } + + #[test] + fn skip_unknown_keys() { + let status = Status { + protocol_version: 1, + network_id: NetworkId::Mainnet, + head_td: U256::default(), + head_hash: H256::default(), + head_num: 10, + genesis_hash: None, + last_head: None, + }; + + let capabilities = Capabilities { + serve_headers: false, + serve_chain_since: Some(5), + serve_state_since: None, + tx_relay: true, + }; + + let flow_params = FlowParams::new( + 1_000_000.into(), + Default::default(), + 1000.into(), + ); + + let handshake = write_handshake(&status, &capabilities, &flow_params); + let interleaved = { + let handshake = UntrustedRlp::new(&handshake); + let mut stream = RlpStream::new_list(handshake.item_count() * 3); + + for item in handshake.iter() { + stream.append_raw(item.as_raw(), 1); + let (mut s1, mut s2) = (RlpStream::new_list(2), RlpStream::new_list(2)); + s1.append(&"foo").append_empty_data(); + s2.append(&"bar").append_empty_data(); + stream.append_raw(&s1.out(), 1); + stream.append_raw(&s2.out(), 1); + } + + stream.out() + }; + + let (read_status, read_capabilities, read_flow) + = parse_handshake(UntrustedRlp::new(&interleaved)).unwrap(); + + assert_eq!(read_status, status); + assert_eq!(read_capabilities, capabilities); + assert_eq!(read_flow, flow_params); + } } \ No newline at end of file