implement handshake parsing and creation
This commit is contained in:
parent
4ba4861734
commit
440f5e537f
@ -171,7 +171,7 @@ pub struct FlowParams {
|
|||||||
impl FlowParams {
|
impl FlowParams {
|
||||||
/// Create new flow parameters from a request cost table,
|
/// Create new flow parameters from a request cost table,
|
||||||
/// buffer limit, and (minimum) rate of recharge.
|
/// buffer limit, and (minimum) rate of recharge.
|
||||||
pub fn new(costs: CostTable, limit: U256, recharge: U256) -> Self {
|
pub fn new(limit: U256, costs: CostTable, recharge: U256) -> Self {
|
||||||
FlowParams {
|
FlowParams {
|
||||||
costs: costs,
|
costs: costs,
|
||||||
limit: limit,
|
limit: limit,
|
||||||
@ -179,6 +179,15 @@ impl FlowParams {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a reference to the buffer limit.
|
||||||
|
pub fn limit(&self) -> &U256 { &self.limit }
|
||||||
|
|
||||||
|
/// Get a reference to the cost table.
|
||||||
|
pub fn cost_table(&self) -> &CostTable { &self.costs }
|
||||||
|
|
||||||
|
/// Get a reference to the recharge rate.
|
||||||
|
pub fn recharge_rate(&self) -> &U256 { &self.recharge }
|
||||||
|
|
||||||
/// Estimate the maximum cost of the request.
|
/// Estimate the maximum cost of the request.
|
||||||
pub fn max_cost(&self, req: &Request) -> U256 {
|
pub fn max_cost(&self, req: &Request) -> U256 {
|
||||||
let amount = match *req {
|
let amount = match *req {
|
||||||
@ -253,7 +262,7 @@ mod tests {
|
|||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
let flow_params = FlowParams::new(Default::default(), 100.into(), 20.into());
|
let flow_params = FlowParams::new(100.into(), Default::default(), 20.into());
|
||||||
let mut buffer = flow_params.create_buffer();
|
let mut buffer = flow_params.create_buffer();
|
||||||
|
|
||||||
assert!(buffer.deduct_cost(101.into()).is_err());
|
assert!(buffer.deduct_cost(101.into()).is_err());
|
||||||
|
@ -97,7 +97,7 @@ pub struct LightProtocol {
|
|||||||
genesis_hash: H256,
|
genesis_hash: H256,
|
||||||
mainnet: bool,
|
mainnet: bool,
|
||||||
peers: RwLock<HashMap<PeerId, Peer>>,
|
peers: RwLock<HashMap<PeerId, Peer>>,
|
||||||
pending_requests: RwLock<HashMap<usize, Requested>>,
|
pending_requests: RwLock<HashMap<usize, Request>>,
|
||||||
req_id: AtomicUsize,
|
req_id: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,9 +16,75 @@
|
|||||||
|
|
||||||
//! Peer status and capabilities.
|
//! Peer status and capabilities.
|
||||||
|
|
||||||
use rlp::{RlpStream, Stream, UntrustedRlp, View};
|
use rlp::{DecoderError, RlpDecodable, RlpEncodable, RlpStream, Stream, UntrustedRlp, View};
|
||||||
use util::{H256, U256};
|
use util::{H256, U256};
|
||||||
|
|
||||||
|
use super::buffer_flow::{CostTable, FlowParams};
|
||||||
|
|
||||||
|
// recognized handshake/announcement keys.
|
||||||
|
// unknown keys are to be skipped, known keys have a defined order.
|
||||||
|
// their string values are defined in the LES spec.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
enum Key {
|
||||||
|
ProtocolVersion,
|
||||||
|
NetworkId,
|
||||||
|
HeadTD,
|
||||||
|
HeadHash,
|
||||||
|
HeadNum,
|
||||||
|
GenesisHash,
|
||||||
|
ServeHeaders,
|
||||||
|
ServeChainSince,
|
||||||
|
ServeStateSince,
|
||||||
|
TxRelay,
|
||||||
|
BufferLimit,
|
||||||
|
BufferCostTable,
|
||||||
|
BufferRechargeRate,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Key {
|
||||||
|
// get the string value of this key.
|
||||||
|
fn as_str(&self) -> &'static str {
|
||||||
|
match *self {
|
||||||
|
Key::ProtocolVersion => "protocolVersion",
|
||||||
|
Key::NetworkId => "networkId",
|
||||||
|
Key::HeadTD => "headTd",
|
||||||
|
Key::HeadHash => "headHash",
|
||||||
|
Key::HeadNum => "headNum",
|
||||||
|
Key::GenesisHash => "genesisHash",
|
||||||
|
Key::ServeHeaders => "serveHeaders",
|
||||||
|
Key::ServeChainSince => "serveChainSince",
|
||||||
|
Key::ServeStateSince => "serveStateSince",
|
||||||
|
Key::TxRelay => "txRelay",
|
||||||
|
Key::BufferLimit => "flowControl/BL",
|
||||||
|
Key::BufferCostTable => "flowControl/MRC",
|
||||||
|
Key::BufferRechargeRate => "flowControl/MRR",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn from_str(s: &str) -> Option<Self> {
|
||||||
|
match s {
|
||||||
|
"protocolVersion" => Some(Key::ProtocolVersion),
|
||||||
|
"networkId" => Some(Key::NetworkId),
|
||||||
|
"headTd" => Some(Key::HeadTD),
|
||||||
|
"headHash" => Some(Key::HeadHash),
|
||||||
|
"headNum" => Some(Key::HeadNum),
|
||||||
|
"genesisHash" => Some(Key::GenesisHash),
|
||||||
|
"serveHeaders" => Some(Key::ServeHeaders),
|
||||||
|
"serveChainSince" => Some(Key::ServeChainSince),
|
||||||
|
"serveStateSince" => Some(Key::ServeStateSince),
|
||||||
|
"txRelay" => Some(Key::TxRelay),
|
||||||
|
"flowControl/BL" => Some(Key::BufferLimit),
|
||||||
|
"flowControl/MRC" => Some(Key::BufferCostTable),
|
||||||
|
"flowControl/MRR" => Some(Key::BufferRechargeRate),
|
||||||
|
_ => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_recognized(s: &str) -> bool {
|
||||||
|
Key::from_str(s).is_some()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Network ID structure.
|
/// Network ID structure.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
#[repr(u32)]
|
#[repr(u32)]
|
||||||
@ -29,6 +95,66 @@ pub enum NetworkId {
|
|||||||
Testnet = 0,
|
Testnet = 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl NetworkId {
|
||||||
|
fn from_raw(raw: u32) -> Option<Self> {
|
||||||
|
match raw {
|
||||||
|
0 => Some(NetworkId::Testnet),
|
||||||
|
1 => Some(NetworkId::Mainnet),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// helper for decoding key-value pairs in the handshake or an announcement.
|
||||||
|
struct Parser<'a> {
|
||||||
|
pos: usize,
|
||||||
|
rlp: UntrustedRlp<'a>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Parser<'a> {
|
||||||
|
// attempt to parse the next key, value pair, and decode the value to the given type.
|
||||||
|
fn expect<T: RlpDecodable>(&mut self, key: Key) -> Result<T, DecoderError> {
|
||||||
|
self.expect_raw(key).and_then(|item| item.as_val())
|
||||||
|
}
|
||||||
|
|
||||||
|
// attempt to parse the next key, value pair, and returns the value's RLP.
|
||||||
|
fn expect_raw(&mut self, key: Key) -> Result<UntrustedRlp<'a>, DecoderError> {
|
||||||
|
loop {
|
||||||
|
let pair = try!(self.rlp.at(self.pos));
|
||||||
|
let k: String = try!(pair.val_at(0));
|
||||||
|
let k = match Key::from_str(&k) {
|
||||||
|
Some(k) => k,
|
||||||
|
None => {
|
||||||
|
// skip any unrecognized keys.
|
||||||
|
self.pos += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if k == key {
|
||||||
|
self.pos += 1;
|
||||||
|
return pair.at(1)
|
||||||
|
} else {
|
||||||
|
return Err(DecoderError::Custom("Missing expected key"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper for encoding a key-value pair
|
||||||
|
fn encode_pair<T: RlpEncodable>(key: Key, val: &T) -> Vec<u8> {
|
||||||
|
let mut s = RlpStream::new_list(2);
|
||||||
|
s.append(&key.as_str()).append(val);
|
||||||
|
s.out()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper for encoding a flag.
|
||||||
|
fn encode_flag(key: Key) -> Vec<u8> {
|
||||||
|
let mut s = RlpStream::new_list(2);
|
||||||
|
s.append(&key.as_str()).append_empty_data();
|
||||||
|
s.out()
|
||||||
|
}
|
||||||
|
|
||||||
/// A peer status message.
|
/// A peer status message.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub struct Status {
|
pub struct Status {
|
||||||
@ -72,26 +198,80 @@ impl Default for Capabilities {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Capabilities {
|
/// Attempt to parse a handshake message into its three parts:
|
||||||
/// Decode capabilities from the given rlp stream, starting from the given
|
/// - chain status
|
||||||
/// index.
|
/// - serving capabilities
|
||||||
fn decode_from(rlp: &UntrustedRlp, start_idx: usize) -> Result<Self, DecoderError> {
|
/// - buffer flow parameters
|
||||||
let mut caps = Capabilities::default();
|
pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowParams), DecoderError> {
|
||||||
|
let mut parser = Parser {
|
||||||
|
pos: 0,
|
||||||
|
rlp: rlp,
|
||||||
|
};
|
||||||
|
|
||||||
for item in rlp.iter().skip(start_idx).take(4) {
|
let status = Status {
|
||||||
let key: String = try!(item.val_at(0));
|
protocol_version: try!(parser.expect(Key::ProtocolVersion)),
|
||||||
|
network_id: try!(parser.expect(Key::NetworkId)
|
||||||
|
.and_then(|id: u32| NetworkId::from_raw(id).ok_or(DecoderError::Custom("Invalid network ID")))),
|
||||||
|
head_td: try!(parser.expect(Key::HeadTD)),
|
||||||
|
head_hash: try!(parser.expect(Key::HeadHash)),
|
||||||
|
head_num: try!(parser.expect(Key::HeadNum)),
|
||||||
|
genesis_hash: parser.expect(Key::GenesisHash).ok(),
|
||||||
|
last_head: None,
|
||||||
|
};
|
||||||
|
|
||||||
match &*key {
|
let capabilities = Capabilities {
|
||||||
"serveHeaders" => caps.serve_headers = true,
|
serve_headers: parser.expect_raw(Key::ServeHeaders).is_ok(),
|
||||||
"serveChainSince" => caps.serve_chain_since = Some(try!(item.val_at(1))),
|
serve_chain_since: parser.expect(Key::ServeChainSince).ok(),
|
||||||
"serveStateSince" => caps.serve_state_since = Some(try!(item.val_at(1))),
|
serve_state_since: parser.expect(Key::ServeStateSince).ok(),
|
||||||
"txRelay" => caps.tx_relay = true,
|
tx_relay: parser.expect_raw(Key::TxRelay).is_ok(),
|
||||||
_ => continue,
|
};
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(caps)
|
let flow_params = FlowParams::new(
|
||||||
|
try!(parser.expect(Key::BufferLimit)),
|
||||||
|
try!(parser.expect(Key::BufferCostTable)),
|
||||||
|
try!(parser.expect(Key::BufferRechargeRate)),
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok((status, capabilities, flow_params))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write a handshake, given status, capabilities, and flow parameters.
|
||||||
|
pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params: &FlowParams) -> Vec<u8> {
|
||||||
|
let mut pairs = Vec::new();
|
||||||
|
pairs.push(encode_pair(Key::ProtocolVersion, &status.protocol_version));
|
||||||
|
pairs.push(encode_pair(Key::NetworkId, &(status.network_id as u32)));
|
||||||
|
pairs.push(encode_pair(Key::HeadTD, &status.head_td));
|
||||||
|
pairs.push(encode_pair(Key::HeadHash, &status.head_hash));
|
||||||
|
pairs.push(encode_pair(Key::HeadNum, &status.head_num));
|
||||||
|
|
||||||
|
if let Some(ref genesis_hash) = status.genesis_hash {
|
||||||
|
pairs.push(encode_pair(Key::GenesisHash, genesis_hash));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if capabilities.serve_headers {
|
||||||
|
pairs.push(encode_flag(Key::ServeHeaders));
|
||||||
|
}
|
||||||
|
if let Some(ref serve_chain_since) = capabilities.serve_chain_since {
|
||||||
|
pairs.push(encode_pair(Key::ServeChainSince, serve_chain_since));
|
||||||
|
}
|
||||||
|
if let Some(ref serve_state_since) = capabilities.serve_state_since {
|
||||||
|
pairs.push(encode_pair(Key::ServeStateSince, serve_state_since));
|
||||||
|
}
|
||||||
|
if capabilities.tx_relay {
|
||||||
|
pairs.push(encode_flag(Key::TxRelay));
|
||||||
|
}
|
||||||
|
|
||||||
|
pairs.push(encode_pair(Key::BufferLimit, flow_params.limit()));
|
||||||
|
pairs.push(encode_pair(Key::BufferCostTable, flow_params.cost_table()));
|
||||||
|
pairs.push(encode_pair(Key::BufferRechargeRate, flow_params.recharge_rate()));
|
||||||
|
|
||||||
|
let mut stream = RlpStream::new_list(pairs.len());
|
||||||
|
|
||||||
|
for pair in pairs {
|
||||||
|
stream.append_raw(&pair, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
stream.out()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An announcement of new chain head or capabilities made by a peer.
|
/// An announcement of new chain head or capabilities made by a peer.
|
||||||
@ -105,6 +285,137 @@ pub struct Announcement {
|
|||||||
head_td: U256,
|
head_td: U256,
|
||||||
/// reorg depth to common ancestor of last announced head.
|
/// reorg depth to common ancestor of last announced head.
|
||||||
reorg_depth: u64,
|
reorg_depth: u64,
|
||||||
/// updated capabilities.
|
/// optional new state-serving capability
|
||||||
new_capabilities: Capabilities,
|
serve_state_since: Option<u64>,
|
||||||
|
/// optional new chain-serving capability
|
||||||
|
serve_chain_since: Option<u64>,
|
||||||
|
// TODO: changes in buffer flow?
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use super::super::buffer_flow::FlowParams;
|
||||||
|
use util::{U256, H256, FixedHash};
|
||||||
|
use rlp::{RlpStream, Stream ,UntrustedRlp, View};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn full_handshake() {
|
||||||
|
let status = Status {
|
||||||
|
protocol_version: 1,
|
||||||
|
network_id: NetworkId::Mainnet,
|
||||||
|
head_td: U256::default(),
|
||||||
|
head_hash: H256::default(),
|
||||||
|
head_num: 10,
|
||||||
|
genesis_hash: Some(H256::zero()),
|
||||||
|
last_head: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let capabilities = Capabilities {
|
||||||
|
serve_headers: true,
|
||||||
|
serve_chain_since: Some(5),
|
||||||
|
serve_state_since: Some(8),
|
||||||
|
tx_relay: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
let flow_params = FlowParams::new(
|
||||||
|
1_000_000.into(),
|
||||||
|
Default::default(),
|
||||||
|
1000.into(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let handshake = write_handshake(&status, &capabilities, &flow_params);
|
||||||
|
|
||||||
|
let (read_status, read_capabilities, read_flow)
|
||||||
|
= parse_handshake(UntrustedRlp::new(&handshake)).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(read_status, status);
|
||||||
|
assert_eq!(read_capabilities, capabilities);
|
||||||
|
assert_eq!(read_flow, flow_params);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn partial_handshake() {
|
||||||
|
let status = Status {
|
||||||
|
protocol_version: 1,
|
||||||
|
network_id: NetworkId::Mainnet,
|
||||||
|
head_td: U256::default(),
|
||||||
|
head_hash: H256::default(),
|
||||||
|
head_num: 10,
|
||||||
|
genesis_hash: None,
|
||||||
|
last_head: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let capabilities = Capabilities {
|
||||||
|
serve_headers: false,
|
||||||
|
serve_chain_since: Some(5),
|
||||||
|
serve_state_since: None,
|
||||||
|
tx_relay: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
let flow_params = FlowParams::new(
|
||||||
|
1_000_000.into(),
|
||||||
|
Default::default(),
|
||||||
|
1000.into(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let handshake = write_handshake(&status, &capabilities, &flow_params);
|
||||||
|
|
||||||
|
let (read_status, read_capabilities, read_flow)
|
||||||
|
= parse_handshake(UntrustedRlp::new(&handshake)).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(read_status, status);
|
||||||
|
assert_eq!(read_capabilities, capabilities);
|
||||||
|
assert_eq!(read_flow, flow_params);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn skip_unknown_keys() {
|
||||||
|
let status = Status {
|
||||||
|
protocol_version: 1,
|
||||||
|
network_id: NetworkId::Mainnet,
|
||||||
|
head_td: U256::default(),
|
||||||
|
head_hash: H256::default(),
|
||||||
|
head_num: 10,
|
||||||
|
genesis_hash: None,
|
||||||
|
last_head: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let capabilities = Capabilities {
|
||||||
|
serve_headers: false,
|
||||||
|
serve_chain_since: Some(5),
|
||||||
|
serve_state_since: None,
|
||||||
|
tx_relay: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
let flow_params = FlowParams::new(
|
||||||
|
1_000_000.into(),
|
||||||
|
Default::default(),
|
||||||
|
1000.into(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let handshake = write_handshake(&status, &capabilities, &flow_params);
|
||||||
|
let interleaved = {
|
||||||
|
let handshake = UntrustedRlp::new(&handshake);
|
||||||
|
let mut stream = RlpStream::new_list(handshake.item_count() * 3);
|
||||||
|
|
||||||
|
for item in handshake.iter() {
|
||||||
|
stream.append_raw(item.as_raw(), 1);
|
||||||
|
let (mut s1, mut s2) = (RlpStream::new_list(2), RlpStream::new_list(2));
|
||||||
|
s1.append(&"foo").append_empty_data();
|
||||||
|
s2.append(&"bar").append_empty_data();
|
||||||
|
stream.append_raw(&s1.out(), 1);
|
||||||
|
stream.append_raw(&s2.out(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
stream.out()
|
||||||
|
};
|
||||||
|
|
||||||
|
let (read_status, read_capabilities, read_flow)
|
||||||
|
= parse_handshake(UntrustedRlp::new(&interleaved)).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(read_status, status);
|
||||||
|
assert_eq!(read_capabilities, capabilities);
|
||||||
|
assert_eq!(read_flow, flow_params);
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user