les: make peer buffer flow params optional

This commit is contained in:
Robert Habermeier 2016-12-09 01:06:51 +01:00
parent d53c47aa69
commit 07d7a37319
4 changed files with 95 additions and 49 deletions

View File

@ -56,6 +56,8 @@ pub enum Error {
UnknownPeer, UnknownPeer,
/// Unsolicited response. /// Unsolicited response.
UnsolicitedResponse, UnsolicitedResponse,
/// Not a server.
NotServer,
} }
impl Error { impl Error {
@ -70,6 +72,7 @@ impl Error {
Error::WrongNetwork => Punishment::Disable, Error::WrongNetwork => Punishment::Disable,
Error::UnknownPeer => Punishment::Disconnect, Error::UnknownPeer => Punishment::Disconnect,
Error::UnsolicitedResponse => Punishment::Disable, Error::UnsolicitedResponse => Punishment::Disable,
Error::NotServer => Punishment::Disable,
} }
} }
} }
@ -97,6 +100,7 @@ impl fmt::Display for Error {
Error::WrongNetwork => write!(f, "Wrong network"), Error::WrongNetwork => write!(f, "Wrong network"),
Error::UnknownPeer => write!(f, "Unknown peer"), Error::UnknownPeer => write!(f, "Unknown peer"),
Error::UnsolicitedResponse => write!(f, "Peer provided unsolicited data"), Error::UnsolicitedResponse => write!(f, "Peer provided unsolicited data"),
Error::NotServer => write!(f, "Peer not a server."),
} }
} }
} }

View File

@ -116,10 +116,9 @@ struct PendingPeer {
// data about each peer. // data about each peer.
struct Peer { struct Peer {
local_buffer: Buffer, // their buffer relative to us local_buffer: Buffer, // their buffer relative to us
remote_buffer: Buffer, // our buffer relative to them
status: Status, status: Status,
capabilities: Capabilities, capabilities: Capabilities,
remote_flow: FlowParams, remote_flow: Option<(Buffer, FlowParams)>,
sent_head: H256, // last head we've given them. sent_head: H256, // last head we've given them.
last_update: SteadyTime, last_update: SteadyTime,
} }
@ -142,12 +141,6 @@ impl Peer {
self.local_buffer.current() self.local_buffer.current()
} }
// recharge remote buffer with remote flow params.
fn recharge_remote(&mut self) {
let flow = &mut self.remote_flow;
flow.recharge(&mut self.remote_buffer);
}
} }
/// An LES event handler. /// An LES event handler.
@ -250,16 +243,21 @@ impl LightProtocol {
/// Check the maximum amount of requests of a specific type /// Check the maximum amount of requests of a specific type
/// which a peer would be able to serve. /// which a peer would be able to serve.
pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> Option<usize> { pub fn max_requests(&self, peer: PeerId, kind: request::Kind) -> Option<usize> {
self.peers.read().get(&peer).map(|peer| { self.peers.read().get(&peer).and_then(|peer| {
let mut peer = peer.lock(); let mut peer = peer.lock();
peer.recharge_remote(); match peer.remote_flow.as_mut() {
peer.remote_flow.max_amount(&peer.remote_buffer, kind) Some(&mut (ref mut buf, ref flow)) => {
flow.recharge(buf);
Some(flow.max_amount(&*buf, kind))
}
None => None,
}
}) })
} }
/// Make a request to a peer. /// Make a request to a peer.
/// ///
/// Fails on: nonexistent peer, network error, /// Fails on: nonexistent peer, network error, peer not server,
/// insufficient buffer. Does not check capabilities before sending. /// insufficient buffer. Does not check capabilities before sending.
/// On success, returns a request id which can later be coordinated /// On success, returns a request id which can later be coordinated
/// with an event. /// with an event.
@ -268,10 +266,14 @@ impl LightProtocol {
let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer)); let peer = try!(peers.get(peer_id).ok_or_else(|| Error::UnknownPeer));
let mut peer = peer.lock(); let mut peer = peer.lock();
peer.recharge_remote(); match peer.remote_flow.as_mut() {
Some(&mut (ref mut buf, ref flow)) => {
let max = peer.remote_flow.compute_cost(request.kind(), request.amount()); flow.recharge(buf);
try!(peer.remote_buffer.deduct_cost(max)); let max = flow.compute_cost(request.kind(), request.amount());
try!(buf.deduct_cost(max));
}
None => return Err(Error::NotServer),
}
let req_id = self.req_id.fetch_add(1, Ordering::SeqCst); let req_id = self.req_id.fetch_add(1, Ordering::SeqCst);
let packet_data = encode_request(&request, req_id); let packet_data = encode_request(&request, req_id);
@ -390,8 +392,13 @@ impl LightProtocol {
match peers.get(peer) { match peers.get(peer) {
Some(peer_info) => { Some(peer_info) => {
let mut peer_info = peer_info.lock(); let mut peer_info = peer_info.lock();
let actual_buffer = ::std::cmp::min(cur_buffer, *peer_info.remote_flow.limit()); match peer_info.remote_flow.as_mut() {
peer_info.remote_buffer.update_to(actual_buffer); Some(&mut (ref mut buf, ref mut flow)) => {
let actual_buffer = ::std::cmp::min(cur_buffer, *flow.limit());
buf.update_to(actual_buffer)
}
None => return Err(Error::NotServer), // this really should be impossible.
}
Ok(ReqId(req_id)) Ok(ReqId(req_id))
} }
None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind. None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind.
@ -516,7 +523,7 @@ impl LightProtocol {
}; };
let capabilities = self.capabilities.read().clone(); let capabilities = self.capabilities.read().clone();
let status_packet = status::write_handshake(&status, &capabilities, &self.flow_params); let status_packet = status::write_handshake(&status, &capabilities, Some(&self.flow_params));
io.send(peer, packet::STATUS, status_packet); io.send(peer, packet::STATUS, status_packet);
@ -543,12 +550,13 @@ impl LightProtocol {
return Err(Error::WrongNetwork); return Err(Error::WrongNetwork);
} }
let remote_flow = flow_params.map(|params| (params.create_buffer(), params));
self.peers.write().insert(*peer, Mutex::new(Peer { self.peers.write().insert(*peer, Mutex::new(Peer {
local_buffer: self.flow_params.create_buffer(), local_buffer: self.flow_params.create_buffer(),
remote_buffer: flow_params.create_buffer(),
status: status.clone(), status: status.clone(),
capabilities: capabilities.clone(), capabilities: capabilities.clone(),
remote_flow: flow_params, remote_flow: remote_flow,
sent_head: pending.sent_head, sent_head: pending.sent_head,
last_update: pending.last_update, last_update: pending.last_update,
})); }));

View File

@ -198,7 +198,7 @@ impl Capabilities {
/// - chain status /// - chain status
/// - serving capabilities /// - serving capabilities
/// - buffer flow parameters /// - buffer flow parameters
pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowParams), DecoderError> { pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, Option<FlowParams>), DecoderError> {
let mut parser = Parser { let mut parser = Parser {
pos: 0, pos: 0,
rlp: rlp, rlp: rlp,
@ -221,17 +221,20 @@ pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowP
tx_relay: parser.expect_raw(Key::TxRelay).is_ok(), tx_relay: parser.expect_raw(Key::TxRelay).is_ok(),
}; };
let flow_params = FlowParams::new( let flow_params = match (
try!(parser.expect(Key::BufferLimit)), parser.expect(Key::BufferLimit),
try!(parser.expect(Key::BufferCostTable)), parser.expect(Key::BufferCostTable),
try!(parser.expect(Key::BufferRechargeRate)), parser.expect(Key::BufferRechargeRate)
); ) {
(Ok(bl), Ok(bct), Ok(brr)) => Some(FlowParams::new(bl, bct, brr)),
_ => None,
};
Ok((status, capabilities, flow_params)) Ok((status, capabilities, flow_params))
} }
/// Write a handshake, given status, capabilities, and flow parameters. /// Write a handshake, given status, capabilities, and flow parameters.
pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params: &FlowParams) -> Vec<u8> { pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params: Option<&FlowParams>) -> Vec<u8> {
let mut pairs = Vec::new(); let mut pairs = Vec::new();
pairs.push(encode_pair(Key::ProtocolVersion, &status.protocol_version)); pairs.push(encode_pair(Key::ProtocolVersion, &status.protocol_version));
pairs.push(encode_pair(Key::NetworkId, &(status.network_id as u64))); pairs.push(encode_pair(Key::NetworkId, &(status.network_id as u64)));
@ -253,9 +256,11 @@ pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params
pairs.push(encode_flag(Key::TxRelay)); pairs.push(encode_flag(Key::TxRelay));
} }
if let Some(flow_params) = flow_params {
pairs.push(encode_pair(Key::BufferLimit, flow_params.limit())); pairs.push(encode_pair(Key::BufferLimit, flow_params.limit()));
pairs.push(encode_pair(Key::BufferCostTable, flow_params.cost_table())); pairs.push(encode_pair(Key::BufferCostTable, flow_params.cost_table()));
pairs.push(encode_pair(Key::BufferRechargeRate, flow_params.recharge_rate())); pairs.push(encode_pair(Key::BufferRechargeRate, flow_params.recharge_rate()));
}
let mut stream = RlpStream::new_list(pairs.len()); let mut stream = RlpStream::new_list(pairs.len());
@ -386,14 +391,14 @@ mod tests {
1000.into(), 1000.into(),
); );
let handshake = write_handshake(&status, &capabilities, &flow_params); let handshake = write_handshake(&status, &capabilities, Some(&flow_params));
let (read_status, read_capabilities, read_flow) let (read_status, read_capabilities, read_flow)
= parse_handshake(UntrustedRlp::new(&handshake)).unwrap(); = parse_handshake(UntrustedRlp::new(&handshake)).unwrap();
assert_eq!(read_status, status); assert_eq!(read_status, status);
assert_eq!(read_capabilities, capabilities); assert_eq!(read_capabilities, capabilities);
assert_eq!(read_flow, flow_params); assert_eq!(read_flow.unwrap(), flow_params);
} }
#[test] #[test]
@ -421,14 +426,14 @@ mod tests {
1000.into(), 1000.into(),
); );
let handshake = write_handshake(&status, &capabilities, &flow_params); let handshake = write_handshake(&status, &capabilities, Some(&flow_params));
let (read_status, read_capabilities, read_flow) let (read_status, read_capabilities, read_flow)
= parse_handshake(UntrustedRlp::new(&handshake)).unwrap(); = parse_handshake(UntrustedRlp::new(&handshake)).unwrap();
assert_eq!(read_status, status); assert_eq!(read_status, status);
assert_eq!(read_capabilities, capabilities); assert_eq!(read_capabilities, capabilities);
assert_eq!(read_flow, flow_params); assert_eq!(read_flow.unwrap(), flow_params);
} }
#[test] #[test]
@ -456,7 +461,7 @@ mod tests {
1000.into(), 1000.into(),
); );
let handshake = write_handshake(&status, &capabilities, &flow_params); let handshake = write_handshake(&status, &capabilities, Some(&flow_params));
let interleaved = { let interleaved = {
let handshake = UntrustedRlp::new(&handshake); let handshake = UntrustedRlp::new(&handshake);
let mut stream = RlpStream::new_list(handshake.item_count() * 3); let mut stream = RlpStream::new_list(handshake.item_count() * 3);
@ -478,7 +483,7 @@ mod tests {
assert_eq!(read_status, status); assert_eq!(read_status, status);
assert_eq!(read_capabilities, capabilities); assert_eq!(read_capabilities, capabilities);
assert_eq!(read_flow, flow_params); assert_eq!(read_flow.unwrap(), flow_params);
} }
#[test] #[test]
@ -528,4 +533,33 @@ mod tests {
let out = stream.drain(); let out = stream.drain();
assert!(parse_announcement(UntrustedRlp::new(&out)).is_ok()); assert!(parse_announcement(UntrustedRlp::new(&out)).is_ok());
} }
#[test]
fn optional_flow() {
let status = Status {
protocol_version: 1,
network_id: 1,
head_td: U256::default(),
head_hash: H256::default(),
head_num: 10,
genesis_hash: H256::zero(),
last_head: None,
};
let capabilities = Capabilities {
serve_headers: true,
serve_chain_since: Some(5),
serve_state_since: Some(8),
tx_relay: true,
};
let handshake = write_handshake(&status, &capabilities, None);
let (read_status, read_capabilities, read_flow)
= parse_handshake(UntrustedRlp::new(&handshake)).unwrap();
assert_eq!(read_status, status);
assert_eq!(read_capabilities, capabilities);
assert!(read_flow.is_none());
}
} }

View File

@ -204,7 +204,7 @@ fn handshake_expected() {
let status = status(provider.client.chain_info()); let status = status(provider.client.chain_info());
let packet_body = write_handshake(&status, &capabilities, &flow_params); let packet_body = write_handshake(&status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
} }
@ -220,7 +220,7 @@ fn genesis_mismatch() {
let mut status = status(provider.client.chain_info()); let mut status = status(provider.client.chain_info());
status.genesis_hash = H256::default(); status.genesis_hash = H256::default();
let packet_body = write_handshake(&status, &capabilities, &flow_params); let packet_body = write_handshake(&status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
} }
@ -235,12 +235,12 @@ fn buffer_overflow() {
let status = status(provider.client.chain_info()); let status = status(provider.client.chain_info());
{ {
let packet_body = write_handshake(&status, &capabilities, &flow_params); let packet_body = write_handshake(&status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
} }
{ {
let my_status = write_handshake(&status, &capabilities, &flow_params); let my_status = write_handshake(&status, &capabilities, Some(&flow_params));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
} }
@ -267,14 +267,14 @@ fn get_block_headers() {
let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
let my_status = write_handshake(&cur_status, &capabilities, &flow_params); let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params));
provider.client.add_blocks(100, EachBlockWith::Nothing); provider.client.add_blocks(100, EachBlockWith::Nothing);
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
{ {
let packet_body = write_handshake(&cur_status, &capabilities, &flow_params); let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
} }
@ -317,14 +317,14 @@ fn get_block_bodies() {
let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
let my_status = write_handshake(&cur_status, &capabilities, &flow_params); let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params));
provider.client.add_blocks(100, EachBlockWith::Nothing); provider.client.add_blocks(100, EachBlockWith::Nothing);
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
{ {
let packet_body = write_handshake(&cur_status, &capabilities, &flow_params); let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
} }
@ -364,14 +364,14 @@ fn get_block_receipts() {
let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
let my_status = write_handshake(&cur_status, &capabilities, &flow_params); let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params));
provider.client.add_blocks(1000, EachBlockWith::Nothing); provider.client.add_blocks(1000, EachBlockWith::Nothing);
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
{ {
let packet_body = write_handshake(&cur_status, &capabilities, &flow_params); let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
} }
@ -419,7 +419,7 @@ fn get_state_proofs() {
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
{ {
let packet_body = write_handshake(&cur_status, &capabilities, &flow_params); let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone()));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body);
} }
@ -468,7 +468,7 @@ fn get_contract_code() {
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
{ {
let packet_body = write_handshake(&cur_status, &capabilities, &flow_params); let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone()));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body);
} }