diff --git a/ethcore/light/src/net/context.rs b/ethcore/light/src/net/context.rs index c05e69b0f..af1f4c677 100644 --- a/ethcore/light/src/net/context.rs +++ b/ethcore/light/src/net/context.rs @@ -16,13 +16,13 @@ //! I/O and event context generalizations. -use network::{NetworkContext, PeerId}; +use network::{NetworkContext, PeerId, NodeId}; use super::{Announcement, LightProtocol, ReqId}; use super::error::Error; use request::Request; -/// An I/O context which allows sending and receiving packets as well as +/// An I/O context which allows sending and receiving packets as well as /// disconnecting peers. This is used as a generalization of the portions /// of a p2p network which the light protocol structure makes use of. pub trait IoContext { @@ -41,6 +41,9 @@ pub trait IoContext { /// Get a peer's protocol version. fn protocol_version(&self, peer: PeerId) -> Option; + + /// Persistent peer id + fn persistent_peer_id(&self, peer: PeerId) -> Option; } impl<'a> IoContext for NetworkContext<'a> { @@ -67,6 +70,10 @@ impl<'a> IoContext for NetworkContext<'a> { fn protocol_version(&self, peer: PeerId) -> Option { self.protocol_version(self.subprotocol_name(), peer) } + + fn persistent_peer_id(&self, peer: PeerId) -> Option { + self.session_info(peer).and_then(|info| info.id) + } } /// Context for a protocol event. @@ -75,6 +82,9 @@ pub trait EventContext { /// disconnected/connected peer. fn peer(&self) -> PeerId; + /// Returns the relevant's peer persistent Id (aka NodeId). + fn persistent_peer_id(&self) -> Option; + /// Make a request from a peer. fn request_from(&self, peer: PeerId, request: Request) -> Result; @@ -89,7 +99,7 @@ pub trait EventContext { fn disable_peer(&self, peer: PeerId); } -/// Concrete implementation of `EventContext` over the light protocol struct and +/// Concrete implementation of `EventContext` over the light protocol struct and /// an io context. pub struct Ctx<'a> { /// Io context to enable immediate response to events. @@ -97,11 +107,18 @@ pub struct Ctx<'a> { /// Protocol implementation. pub proto: &'a LightProtocol, /// Relevant peer for event. - pub peer: PeerId, + pub peer: PeerId, } impl<'a> EventContext for Ctx<'a> { - fn peer(&self) -> PeerId { self.peer } + + fn peer(&self) -> PeerId { + self.peer + } + + fn persistent_peer_id(&self) -> Option { + self.io.persistent_peer_id(self.peer) + } fn request_from(&self, peer: PeerId, request: Request) -> Result { self.proto.request_from(self.io, &peer, request) } @@ -117,4 +134,4 @@ impl<'a> EventContext for Ctx<'a> { fn disable_peer(&self, peer: PeerId) { self.io.disable_peer(peer); } -} \ No newline at end of file +} diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index 876432ce2..e2a17a41e 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -15,13 +15,13 @@ // along with Parity. If not, see . //! Tests for the `LightProtocol` implementation. -//! These don't test of the higher level logic on top of +//! These don't test of the higher level logic on top of use ethcore::blockchain_info::BlockChainInfo; use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient}; use ethcore::ids::BlockId; use ethcore::transaction::SignedTransaction; -use network::PeerId; +use network::{PeerId, NodeId}; use net::buffer_flow::FlowParams; use net::context::IoContext; @@ -68,6 +68,10 @@ impl IoContext for Expect { fn protocol_version(&self, _peer: PeerId) -> Option { Some(super::MAX_PROTOCOL_VERSION) } + + fn persistent_peer_id(&self, _peer: PeerId) -> Option { + None + } } // can't implement directly for Arc due to cross-crate orphan rules. @@ -106,7 +110,7 @@ impl Provider for TestProvider { .map(|x: u64| x.saturating_mul(req.skip + 1)) .take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num >= *x }) .map(|x| if req.reverse { start_num - x } else { start_num + x }) - .map(|x| self.0.client.block_header(BlockId::Number(x))) + .map(|x| self.0.client.block_header(BlockId::Number(x))) .take_while(|x| x.is_some()) .flat_map(|x| x) .collect() @@ -139,12 +143,12 @@ impl Provider for TestProvider { } } }) - .collect() + .collect() } fn contract_code(&self, req: request::ContractCodes) -> Vec { req.code_requests.into_iter() - .map(|req| { + .map(|req| { req.account_key.iter().chain(req.account_key.iter()).cloned().collect() }) .collect() @@ -202,9 +206,9 @@ fn status(chain_info: BlockChainInfo) -> Status { #[test] fn handshake_expected() { let flow_params = make_flow_params(); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let status = status(provider.client.chain_info()); @@ -217,9 +221,9 @@ fn handshake_expected() { #[should_panic] fn genesis_mismatch() { let flow_params = make_flow_params(); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let mut status = status(provider.client.chain_info()); status.genesis_hash = H256::default(); @@ -232,15 +236,15 @@ fn genesis_mismatch() { #[test] fn buffer_overflow() { let flow_params = make_flow_params(); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&status, &capabilities, Some(&flow_params)); - proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); + let packet_body = write_handshake(&status, &capabilities, Some(&flow_params)); + proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); } { @@ -266,9 +270,9 @@ fn buffer_overflow() { #[test] fn get_block_headers() { let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); @@ -278,8 +282,8 @@ fn get_block_headers() { let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); - proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -300,7 +304,7 @@ fn get_block_headers() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Headers, 10); let mut response_stream = RlpStream::new_list(12); - + response_stream.append(&req_id).append(&new_buf); for header in headers { response_stream.append_raw(&header, 1); @@ -316,9 +320,9 @@ fn get_block_headers() { #[test] fn get_block_bodies() { let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); @@ -328,8 +332,8 @@ fn get_block_bodies() { let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); - proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -347,7 +351,7 @@ fn get_block_bodies() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Bodies, 10); let mut response_stream = RlpStream::new_list(12); - + response_stream.append(&req_id).append(&new_buf); for body in bodies { response_stream.append_raw(&body, 1); @@ -363,9 +367,9 @@ fn get_block_bodies() { #[test] fn get_block_receipts() { let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params)); @@ -375,8 +379,8 @@ fn get_block_receipts() { let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); - proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body)); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status); } @@ -400,7 +404,7 @@ fn get_block_receipts() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Receipts, receipts.len()); let mut response_stream = RlpStream::new_list(2 + receipts.len()); - + response_stream.append(&req_id).append(&new_buf); for block_receipts in receipts { response_stream.append_raw(&block_receipts, 1); @@ -416,15 +420,15 @@ fn get_block_receipts() { #[test] fn get_state_proofs() { let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); - proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); } @@ -432,7 +436,7 @@ fn get_state_proofs() { let key1 = U256::from(11223344).into(); let key2 = U256::from(99988887).into(); - let request = Request::StateProofs (request::StateProofs { + let request = Request::StateProofs (request::StateProofs { requests: vec![ request::StateProof { block: H256::default(), key1: key1, key2: None, from_level: 0 }, request::StateProof { block: H256::default(), key1: key1, key2: Some(key2), from_level: 0}, @@ -449,7 +453,7 @@ fn get_state_proofs() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::StateProofs, 2); let mut response_stream = RlpStream::new_list(4); - + response_stream.append(&req_id).append(&new_buf); for proof in proofs { response_stream.append_raw(&proof, 1); @@ -465,15 +469,15 @@ fn get_state_proofs() { #[test] fn get_contract_code() { let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into()); - let capabilities = capabilities(); + let capabilities = capabilities(); - let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); + let (provider, proto) = setup(flow_params.clone(), capabilities.clone()); let cur_status = status(provider.client.chain_info()); { - let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); - proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); + let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params)); + proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone())); proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body); } @@ -481,7 +485,7 @@ fn get_contract_code() { let key1 = U256::from(11223344).into(); let key2 = U256::from(99988887).into(); - let request = Request::Codes (request::ContractCodes { + let request = Request::Codes (request::ContractCodes { code_requests: vec![ request::ContractCode { block_hash: H256::default(), account_key: key1 }, request::ContractCode { block_hash: H256::default(), account_key: key2 }, @@ -498,7 +502,7 @@ fn get_contract_code() { let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Codes, 2); let mut response_stream = RlpStream::new_list(4); - + response_stream.append(&req_id).append(&new_buf); for code in codes { response_stream.append(&code); @@ -509,4 +513,4 @@ fn get_contract_code() { let expected = Expect::Respond(packet::CONTRACT_CODES, response); proto.handle_packet(&expected, &1, packet::GET_CONTRACT_CODES, &request_body); -} \ No newline at end of file +} diff --git a/sync/src/api.rs b/sync/src/api.rs index 10434ce26..0f3695fe9 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -351,8 +351,7 @@ struct TxRelay(Arc); impl LightHandler for TxRelay { fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) { trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer()); - // TODO [ToDr] Can we get a peer enode somehow? - self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), None) + self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect(), ctx.persistent_peer_id()) } } diff --git a/util/network/src/lib.rs b/util/network/src/lib.rs index f21cb498d..a1eef68fa 100644 --- a/util/network/src/lib.rs +++ b/util/network/src/lib.rs @@ -99,7 +99,7 @@ pub use stats::NetworkStats; pub use session::SessionInfo; use io::TimerToken; -pub use node_table::is_valid_node_url; +pub use node_table::{is_valid_node_url, NodeId}; const PROTOCOL_VERSION: u32 = 4;