From 207f9d02f25c42b120bad0ea21ad1606a1c3f442 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 15 Aug 2016 14:25:57 +0200 Subject: [PATCH] Started inf networking --- ethcore/src/client/chain_notify.rs | 4 + ethcore/src/client/client.rs | 4 + ethcore/src/client/test_client.rs | 4 + ethcore/src/client/traits.rs | 3 + sync/src/api.rs | 63 ++++++++-- sync/src/infinity.rs | 190 +++++++++++++++++++++++++++++ sync/src/lib.rs | 1 + 7 files changed, 259 insertions(+), 10 deletions(-) create mode 100644 sync/src/infinity.rs diff --git a/ethcore/src/client/chain_notify.rs b/ethcore/src/client/chain_notify.rs index 897c8cfac..e4638f152 100644 --- a/ethcore/src/client/chain_notify.rs +++ b/ethcore/src/client/chain_notify.rs @@ -40,6 +40,10 @@ pub trait ChainNotify : Send + Sync { fn stop(&self) { // does nothing by default } + + /// fires when chain broadcasts a message + fn broadcast(&self, _data: Vec) { + } } impl IpcConfig for ChainNotify { } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index aced57e4c..bf871e4ad 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1020,6 +1020,10 @@ impl BlockChainClient for Client { fn pending_transactions(&self) -> Vec { self.miner.pending_transactions() } + + fn queue_infinity_message(&self, _message: Bytes) { + //TODO: handle message here + } } impl MiningBlockChainClient for Client { diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 212dead9a..8852448cc 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -554,6 +554,10 @@ impl BlockChainClient for TestBlockChainClient { self.miner.import_external_transactions(self, txs); } + fn queue_infinity_message(&self, _packet: Bytes) { + unimplemented!(); + } + fn pending_transactions(&self) -> Vec { self.miner.pending_transactions() } diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 271e95785..da876efa6 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -182,6 +182,9 @@ pub trait BlockChainClient : Sync + Send { /// Queue transactions for importing. fn queue_transactions(&self, transactions: Vec); + /// Queue packet + fn queue_infinity_message(&self, packet: Bytes); + /// list all transactions fn pending_transactions(&self) -> Vec; diff --git a/sync/src/api.rs b/sync/src/api.rs index 608d9d521..cb1c47229 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -23,6 +23,7 @@ use ethcore::client::{BlockChainClient, ChainNotify}; use ethcore::header::BlockNumber; use sync_io::NetSyncIo; use chain::{ChainSync, SyncStatus}; +use infinity::{InfinitySync}; use std::net::{SocketAddr, AddrParseError}; use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig}; use std::str::FromStr; @@ -30,6 +31,8 @@ use parking_lot::RwLock; /// Ethereum sync protocol pub const ETH_PROTOCOL: &'static str = "eth"; +/// Infinity protocol +pub const INF_PROTOCOL: &'static str = "inf"; /// Sync configuration #[derive(Debug, Clone)] @@ -65,18 +68,22 @@ pub trait SyncProvider: Send + Sync { pub struct EthSync { /// Network service network: NetworkService, - /// Protocol handler - handler: Arc, + /// Ethereum Protocol handler + eth_handler: Arc, + /// Infinity Protocol handler + inf_handler: Arc, } impl EthSync { /// Creates and register protocol with the network service pub fn new(config: SyncConfig, chain: Arc, network_config: NetworkConfiguration) -> Result, NetworkError> { + let inf_sync = InfinitySync::new(&config, chain.clone()); let chain_sync = ChainSync::new(config, &*chain); let service = try!(NetworkService::new(try!(network_config.into_basic()))); let sync = Arc::new(EthSync{ network: service, - handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain }), + eth_handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain.clone() }), + inf_handler: Arc::new(InfProtocolHandler { sync: RwLock::new(inf_sync), chain: chain }), }); Ok(sync) @@ -88,12 +95,12 @@ impl EthSync { impl SyncProvider for EthSync { /// Get sync status fn status(&self) -> SyncStatus { - self.handler.sync.write().status() + self.eth_handler.sync.write().status() } } struct SyncProtocolHandler { - /// Shared blockchain client. TODO: this should evetually become an IPC endpoint + /// Shared blockchain client. chain: Arc, /// Sync strategy sync: RwLock, @@ -122,6 +129,33 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } } +struct InfProtocolHandler { + /// Shared blockchain client. + chain: Arc, + /// Sync strategy + sync: RwLock, +} + +impl NetworkProtocolHandler for InfProtocolHandler { + fn initialize(&self, _io: &NetworkContext) { + } + + fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { + InfinitySync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain), *peer, packet_id, data); + } + + fn connected(&self, io: &NetworkContext, peer: &PeerId) { + self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain), *peer); + } + + fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { + self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain), *peer); + } + + fn timeout(&self, _io: &NetworkContext, _timer: TimerToken) { + } +} + impl ChainNotify for EthSync { fn new_blocks(&self, imported: Vec, @@ -132,8 +166,8 @@ impl ChainNotify for EthSync { _duration: u64) { self.network.with_context(ETH_PROTOCOL, |context| { - let mut sync_io = NetSyncIo::new(context, &*self.handler.chain); - self.handler.sync.write().chain_new_blocks( + let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain); + self.eth_handler.sync.write().chain_new_blocks( &mut sync_io, &imported, &invalid, @@ -145,13 +179,22 @@ impl ChainNotify for EthSync { fn start(&self) { self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e)); - self.network.register_protocol(self.handler.clone(), ETH_PROTOCOL, &[62u8, 63u8]) + self.network.register_protocol(self.eth_handler.clone(), ETH_PROTOCOL, &[62u8, 63u8]) .unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); + self.network.register_protocol(self.inf_handler.clone(), INF_PROTOCOL, &[1u8]) + .unwrap_or_else(|e| warn!("Error registering infinity protocol: {:?}", e)); } fn stop(&self) { self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); } + + fn broadcast(&self, message: Vec) { + self.network.with_context(ETH_PROTOCOL, |context| { + let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain); + self.inf_handler.sync.write().propagate_packet(&mut sync_io, message.clone()); + }); + } } impl IpcConfig for ManageNetwork { } @@ -201,8 +244,8 @@ impl ManageNetwork for EthSync { fn stop_network(&self) { self.network.with_context(ETH_PROTOCOL, |context| { - let mut sync_io = NetSyncIo::new(context, &*self.handler.chain); - self.handler.sync.write().abort(&mut sync_io); + let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain); + self.eth_handler.sync.write().abort(&mut sync_io); }); self.stop(); } diff --git a/sync/src/infinity.rs b/sync/src/infinity.rs new file mode 100644 index 000000000..23886560e --- /dev/null +++ b/sync/src/infinity.rs @@ -0,0 +1,190 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +/// Infinity networking + +use util::*; +use network::*; +use ethcore::client::{BlockChainClient}; +use sync_io::SyncIo; +use super::SyncConfig; + +known_heap_size!(0, PeerInfo); + +type PacketDecodeError = DecoderError; + +const PROTOCOL_VERSION: u8 = 1u8; + +const STATUS_PACKET: u8 = 0x00; +const GENERIC_PACKET: u8 = 0x01; + +/// Syncing status and statistics +#[derive(Clone)] +pub struct NetworkStatus { + pub protocol_version: u8, + /// The underlying p2p network version. + pub network_id: U256, + /// Total number of connected peers + pub num_peers: usize, + /// Total number of active peers + pub num_active_peers: usize, +} + +#[derive(Clone)] +/// Inf peer information +struct PeerInfo { + /// inf protocol version + protocol_version: u32, + /// Peer chain genesis hash + genesis: H256, + /// Peer network id + network_id: U256, +} + +/// Infinity protocol handler. +pub struct InfinitySync { + chain: Arc, + /// All connected peers + peers: HashMap, + /// Network ID + network_id: U256, +} + +impl InfinitySync { + /// Create a new instance of syncing strategy. + pub fn new(config: &SyncConfig, chain: Arc) -> InfinitySync { + let mut sync = InfinitySync { + chain: chain, + peers: HashMap::new(), + network_id: config.network_id, + }; + sync.reset(); + sync + } + + /// @returns Synchonization status + pub fn _status(&self) -> NetworkStatus { + NetworkStatus { + protocol_version: 1, + network_id: self.network_id, + num_peers: self.peers.len(), + num_active_peers: 0, + } + } + + #[cfg_attr(feature="dev", allow(for_kv_map))] // Because it's not possible to get `values_mut()` + /// Reset sync. Clear all downloaded data but keep the queue + fn reset(&mut self) { + } + + /// Called by peer to report status + fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let peer = PeerInfo { + protocol_version: try!(r.val_at(0)), + network_id: try!(r.val_at(1)), + genesis: try!(r.val_at(2)), + }; + trace!(target: "inf", "New peer {} (protocol: {}, network: {:?}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.genesis); + if self.peers.contains_key(&peer_id) { + debug!(target: "inf", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id)); + return Ok(()); + } + let chain_info = io.chain().chain_info(); + if peer.genesis != chain_info.genesis_hash { + io.disable_peer(peer_id); + trace!(target: "inf", "Peer {} genesis hash mismatch (ours: {}, theirs: {})", peer_id, chain_info.genesis_hash, peer.genesis); + return Ok(()); + } + if peer.network_id != self.network_id { + io.disable_peer(peer_id); + trace!(target: "inf", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, self.network_id, peer.network_id); + return Ok(()); + } + + self.peers.insert(peer_id.clone(), peer); + Ok(()) + } + + /// Called when a new peer is connected + pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) { + trace!(target: "inf", "== Connected {}: {}", peer, io.peer_info(peer)); + if let Err(e) = self.send_status(io) { + debug!(target:"inf", "Error sending status request: {:?}", e); + io.disable_peer(peer); + } + } + + /// Generic packet sender + fn send_packet(&mut self, sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { + if self.peers.contains_key(&peer_id) { + if let Err(e) = sync.send(peer_id, packet_id, packet) { + debug!(target:"inf", "Error sending request: {:?}", e); + sync.disable_peer(peer_id); + } + } + } + + /// Called when peer sends us new transactions + fn on_peer_packet(&mut self, _io: &mut SyncIo, _peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + self.chain.queue_infinity_message(r.as_raw().to_vec()); + Ok(()) + } + + /// Called by peer when it is disconnecting + pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { + trace!(target: "inf", "== Disconnecting {}: {}", peer, io.peer_info(peer)); + if self.peers.contains_key(&peer) { + debug!(target: "inf", "Disconnected {}", peer); + self.peers.remove(&peer); + } + } + + /// Send Status message + fn send_status(&mut self, io: &mut SyncIo) -> Result<(), NetworkError> { + let mut packet = RlpStream::new_list(5); + let chain = io.chain().chain_info(); + packet.append(&(PROTOCOL_VERSION as u32)); + packet.append(&self.network_id); + packet.append(&chain.total_difficulty); + packet.append(&chain.best_block_hash); + packet.append(&chain.genesis_hash); + io.respond(STATUS_PACKET, packet.out()) + } + + pub fn dispatch_packet(sync: &RwLock, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { + let rlp = UntrustedRlp::new(data); + match packet_id { + STATUS_PACKET => sync.write().on_peer_status(io, peer, &rlp).unwrap_or_else( + |e| trace!(target: "inf", "Error processing packet: {:?}", e)), + GENERIC_PACKET => sync.write().on_peer_packet(io, peer, &rlp).unwrap_or_else( + |e| warn!(target: "inf", "Error queueing packet: {:?}", e)), + p @ _ => trace!(target: "inf", "Unexpected packet {} from {}", p, peer), + }; + } + + pub fn propagate_packet(&mut self, io: &mut SyncIo, packet: Bytes) { + let lucky_peers: Vec<_> = self.peers.keys().cloned().collect(); + trace!(target: "inf", "Sending packets to {:?}", lucky_peers); + for peer_id in lucky_peers { + self.send_packet(io, peer_id, GENERIC_PACKET, packet.clone()); + } + } +} + +#[cfg(test)] +mod tests { +} + diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 69dd03a2a..29bd50bb2 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -82,6 +82,7 @@ extern crate parking_lot; mod chain; mod blocks; mod sync_io; +mod infinity; #[cfg(test)] mod tests;