Returning persistent node id

This commit is contained in:
Tomasz Drwięga 2016-12-10 16:51:50 +01:00
parent e66157f922
commit aaf6da4c00
4 changed files with 70 additions and 50 deletions

View File

@ -16,13 +16,13 @@
//! I/O and event context generalizations. //! I/O and event context generalizations.
use network::{NetworkContext, PeerId}; use network::{NetworkContext, PeerId, NodeId};
use super::{Announcement, LightProtocol, ReqId}; use super::{Announcement, LightProtocol, ReqId};
use super::error::Error; use super::error::Error;
use request::Request; use request::Request;
/// An I/O context which allows sending and receiving packets as well as /// An I/O context which allows sending and receiving packets as well as
/// disconnecting peers. This is used as a generalization of the portions /// disconnecting peers. This is used as a generalization of the portions
/// of a p2p network which the light protocol structure makes use of. /// of a p2p network which the light protocol structure makes use of.
pub trait IoContext { pub trait IoContext {
@ -41,6 +41,9 @@ pub trait IoContext {
/// Get a peer's protocol version. /// Get a peer's protocol version.
fn protocol_version(&self, peer: PeerId) -> Option<u8>; fn protocol_version(&self, peer: PeerId) -> Option<u8>;
/// Persistent peer id
fn persistent_peer_id(&self, peer: PeerId) -> Option<NodeId>;
} }
impl<'a> IoContext for NetworkContext<'a> { impl<'a> IoContext for NetworkContext<'a> {
@ -67,6 +70,10 @@ impl<'a> IoContext for NetworkContext<'a> {
fn protocol_version(&self, peer: PeerId) -> Option<u8> { fn protocol_version(&self, peer: PeerId) -> Option<u8> {
self.protocol_version(self.subprotocol_name(), peer) self.protocol_version(self.subprotocol_name(), peer)
} }
fn persistent_peer_id(&self, peer: PeerId) -> Option<NodeId> {
self.session_info(peer).and_then(|info| info.id)
}
} }
/// Context for a protocol event. /// Context for a protocol event.
@ -75,6 +82,9 @@ pub trait EventContext {
/// disconnected/connected peer. /// disconnected/connected peer.
fn peer(&self) -> PeerId; fn peer(&self) -> PeerId;
/// Returns the relevant's peer persistent Id (aka NodeId).
fn persistent_peer_id(&self) -> Option<NodeId>;
/// Make a request from a peer. /// Make a request from a peer.
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error>; fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error>;
@ -89,7 +99,7 @@ pub trait EventContext {
fn disable_peer(&self, peer: PeerId); fn disable_peer(&self, peer: PeerId);
} }
/// Concrete implementation of `EventContext` over the light protocol struct and /// Concrete implementation of `EventContext` over the light protocol struct and
/// an io context. /// an io context.
pub struct Ctx<'a> { pub struct Ctx<'a> {
/// Io context to enable immediate response to events. /// Io context to enable immediate response to events.
@ -97,11 +107,18 @@ pub struct Ctx<'a> {
/// Protocol implementation. /// Protocol implementation.
pub proto: &'a LightProtocol, pub proto: &'a LightProtocol,
/// Relevant peer for event. /// Relevant peer for event.
pub peer: PeerId, pub peer: PeerId,
} }
impl<'a> EventContext for Ctx<'a> { impl<'a> EventContext for Ctx<'a> {
fn peer(&self) -> PeerId { self.peer }
fn peer(&self) -> PeerId {
self.peer
}
fn persistent_peer_id(&self) -> Option<NodeId> {
self.io.persistent_peer_id(self.peer)
}
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error> { fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error> {
self.proto.request_from(self.io, &peer, request) self.proto.request_from(self.io, &peer, request)
} }
@ -117,4 +134,4 @@ impl<'a> EventContext for Ctx<'a> {
fn disable_peer(&self, peer: PeerId) { fn disable_peer(&self, peer: PeerId) {
self.io.disable_peer(peer); self.io.disable_peer(peer);
} }
} }

View File

@ -15,13 +15,13 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>. // along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Tests for the `LightProtocol` implementation. //! Tests for the `LightProtocol` implementation.
//! These don't test of the higher level logic on top of //! These don't test of the higher level logic on top of
use ethcore::blockchain_info::BlockChainInfo; use ethcore::blockchain_info::BlockChainInfo;
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient}; use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient};
use ethcore::ids::BlockId; use ethcore::ids::BlockId;
use ethcore::transaction::SignedTransaction; use ethcore::transaction::SignedTransaction;
use network::PeerId; use network::{PeerId, NodeId};
use net::buffer_flow::FlowParams; use net::buffer_flow::FlowParams;
use net::context::IoContext; use net::context::IoContext;
@ -68,6 +68,10 @@ impl IoContext for Expect {
fn protocol_version(&self, _peer: PeerId) -> Option<u8> { fn protocol_version(&self, _peer: PeerId) -> Option<u8> {
Some(super::MAX_PROTOCOL_VERSION) Some(super::MAX_PROTOCOL_VERSION)
} }
fn persistent_peer_id(&self, _peer: PeerId) -> Option<NodeId> {
None
}
} }
// can't implement directly for Arc due to cross-crate orphan rules. // can't implement directly for Arc due to cross-crate orphan rules.
@ -106,7 +110,7 @@ impl Provider for TestProvider {
.map(|x: u64| x.saturating_mul(req.skip + 1)) .map(|x: u64| x.saturating_mul(req.skip + 1))
.take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num >= *x }) .take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num >= *x })
.map(|x| if req.reverse { start_num - x } else { start_num + x }) .map(|x| if req.reverse { start_num - x } else { start_num + x })
.map(|x| self.0.client.block_header(BlockId::Number(x))) .map(|x| self.0.client.block_header(BlockId::Number(x)))
.take_while(|x| x.is_some()) .take_while(|x| x.is_some())
.flat_map(|x| x) .flat_map(|x| x)
.collect() .collect()
@ -139,12 +143,12 @@ impl Provider for TestProvider {
} }
} }
}) })
.collect() .collect()
} }
fn contract_code(&self, req: request::ContractCodes) -> Vec<Bytes> { fn contract_code(&self, req: request::ContractCodes) -> Vec<Bytes> {
req.code_requests.into_iter() req.code_requests.into_iter()
.map(|req| { .map(|req| {
req.account_key.iter().chain(req.account_key.iter()).cloned().collect() req.account_key.iter().chain(req.account_key.iter()).cloned().collect()
}) })
.collect() .collect()
@ -202,9 +206,9 @@ fn status(chain_info: BlockChainInfo) -> Status {
#[test] #[test]
fn handshake_expected() { fn handshake_expected() {
let flow_params = make_flow_params(); let flow_params = make_flow_params();
let capabilities = capabilities(); let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let status = status(provider.client.chain_info()); let status = status(provider.client.chain_info());
@ -217,9 +221,9 @@ fn handshake_expected() {
#[should_panic] #[should_panic]
fn genesis_mismatch() { fn genesis_mismatch() {
let flow_params = make_flow_params(); let flow_params = make_flow_params();
let capabilities = capabilities(); let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let mut status = status(provider.client.chain_info()); let mut status = status(provider.client.chain_info());
status.genesis_hash = H256::default(); status.genesis_hash = H256::default();
@ -232,15 +236,15 @@ fn genesis_mismatch() {
#[test] #[test]
fn buffer_overflow() { fn buffer_overflow() {
let flow_params = make_flow_params(); let flow_params = make_flow_params();
let capabilities = capabilities(); let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let status = status(provider.client.chain_info()); let status = status(provider.client.chain_info());
{ {
let packet_body = write_handshake(&status, &capabilities, Some(&flow_params)); let packet_body = write_handshake(&status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
} }
{ {
@ -266,9 +270,9 @@ fn buffer_overflow() {
#[test] #[test]
fn get_block_headers() { fn get_block_headers() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities(); let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params));
@ -278,8 +282,8 @@ fn get_block_headers() {
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
{ {
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
} }
@ -300,7 +304,7 @@ fn get_block_headers() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Headers, 10); let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Headers, 10);
let mut response_stream = RlpStream::new_list(12); let mut response_stream = RlpStream::new_list(12);
response_stream.append(&req_id).append(&new_buf); response_stream.append(&req_id).append(&new_buf);
for header in headers { for header in headers {
response_stream.append_raw(&header, 1); response_stream.append_raw(&header, 1);
@ -316,9 +320,9 @@ fn get_block_headers() {
#[test] #[test]
fn get_block_bodies() { fn get_block_bodies() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities(); let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params));
@ -328,8 +332,8 @@ fn get_block_bodies() {
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
{ {
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
} }
@ -347,7 +351,7 @@ fn get_block_bodies() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Bodies, 10); let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Bodies, 10);
let mut response_stream = RlpStream::new_list(12); let mut response_stream = RlpStream::new_list(12);
response_stream.append(&req_id).append(&new_buf); response_stream.append(&req_id).append(&new_buf);
for body in bodies { for body in bodies {
response_stream.append_raw(&body, 1); response_stream.append_raw(&body, 1);
@ -363,9 +367,9 @@ fn get_block_bodies() {
#[test] #[test]
fn get_block_receipts() { fn get_block_receipts() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities(); let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params));
@ -375,8 +379,8 @@ fn get_block_receipts() {
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
{ {
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
} }
@ -400,7 +404,7 @@ fn get_block_receipts() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Receipts, receipts.len()); let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Receipts, receipts.len());
let mut response_stream = RlpStream::new_list(2 + receipts.len()); let mut response_stream = RlpStream::new_list(2 + receipts.len());
response_stream.append(&req_id).append(&new_buf); response_stream.append(&req_id).append(&new_buf);
for block_receipts in receipts { for block_receipts in receipts {
response_stream.append_raw(&block_receipts, 1); response_stream.append_raw(&block_receipts, 1);
@ -416,15 +420,15 @@ fn get_block_receipts() {
#[test] #[test]
fn get_state_proofs() { fn get_state_proofs() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities(); let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
{ {
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone()));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body);
} }
@ -432,7 +436,7 @@ fn get_state_proofs() {
let key1 = U256::from(11223344).into(); let key1 = U256::from(11223344).into();
let key2 = U256::from(99988887).into(); let key2 = U256::from(99988887).into();
let request = Request::StateProofs (request::StateProofs { let request = Request::StateProofs (request::StateProofs {
requests: vec![ requests: vec![
request::StateProof { block: H256::default(), key1: key1, key2: None, from_level: 0 }, request::StateProof { block: H256::default(), key1: key1, key2: None, from_level: 0 },
request::StateProof { block: H256::default(), key1: key1, key2: Some(key2), from_level: 0}, request::StateProof { block: H256::default(), key1: key1, key2: Some(key2), from_level: 0},
@ -449,7 +453,7 @@ fn get_state_proofs() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::StateProofs, 2); let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::StateProofs, 2);
let mut response_stream = RlpStream::new_list(4); let mut response_stream = RlpStream::new_list(4);
response_stream.append(&req_id).append(&new_buf); response_stream.append(&req_id).append(&new_buf);
for proof in proofs { for proof in proofs {
response_stream.append_raw(&proof, 1); response_stream.append_raw(&proof, 1);
@ -465,15 +469,15 @@ fn get_state_proofs() {
#[test] #[test]
fn get_contract_code() { fn get_contract_code() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities(); let capabilities = capabilities();
let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let cur_status = status(provider.client.chain_info()); let cur_status = status(provider.client.chain_info());
{ {
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone()));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body);
} }
@ -481,7 +485,7 @@ fn get_contract_code() {
let key1 = U256::from(11223344).into(); let key1 = U256::from(11223344).into();
let key2 = U256::from(99988887).into(); let key2 = U256::from(99988887).into();
let request = Request::Codes (request::ContractCodes { let request = Request::Codes (request::ContractCodes {
code_requests: vec![ code_requests: vec![
request::ContractCode { block_hash: H256::default(), account_key: key1 }, request::ContractCode { block_hash: H256::default(), account_key: key1 },
request::ContractCode { block_hash: H256::default(), account_key: key2 }, request::ContractCode { block_hash: H256::default(), account_key: key2 },
@ -498,7 +502,7 @@ fn get_contract_code() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Codes, 2); let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Codes, 2);
let mut response_stream = RlpStream::new_list(4); let mut response_stream = RlpStream::new_list(4);
response_stream.append(&req_id).append(&new_buf); response_stream.append(&req_id).append(&new_buf);
for code in codes { for code in codes {
response_stream.append(&code); response_stream.append(&code);
@ -509,4 +513,4 @@ fn get_contract_code() {
let expected = Expect::Respond(packet::CONTRACT_CODES, response); let expected = Expect::Respond(packet::CONTRACT_CODES, response);
proto.handle_packet(&expected, &1, packet::GET_CONTRACT_CODES, &request_body); proto.handle_packet(&expected, &1, packet::GET_CONTRACT_CODES, &request_body);
} }

View File

@ -351,8 +351,7 @@ struct TxRelay(Arc<BlockChainClient>);
impl LightHandler for TxRelay { impl LightHandler for TxRelay {
fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) { fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) {
trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer()); trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer());
// TODO [ToDr] Can we get a peer enode somehow? self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.persistent_peer_id())
self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), None)
} }
} }

View File

@ -99,7 +99,7 @@ pub use stats::NetworkStats;
pub use session::SessionInfo; pub use session::SessionInfo;
use io::TimerToken; use io::TimerToken;
pub use node_table::is_valid_node_url; pub use node_table::{is_valid_node_url, NodeId};
const PROTOCOL_VERSION: u32 = 4; const PROTOCOL_VERSION: u32 = 4;