Started inf networking
This commit is contained in:
		
							parent
							
								
									535c502771
								
							
						
					
					
						commit
						207f9d02f2
					
				| @ -40,6 +40,10 @@ pub trait ChainNotify : Send + Sync { | ||||
| 	fn stop(&self) { | ||||
| 		// does nothing by default
 | ||||
| 	} | ||||
| 
 | ||||
| 	/// fires when chain broadcasts a message
 | ||||
| 	fn broadcast(&self, _data: Vec<u8>) { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| impl IpcConfig for ChainNotify { } | ||||
|  | ||||
| @ -1020,6 +1020,10 @@ impl BlockChainClient for Client { | ||||
| 	fn pending_transactions(&self) -> Vec<SignedTransaction> { | ||||
| 		self.miner.pending_transactions() | ||||
| 	} | ||||
| 
 | ||||
| 	fn queue_infinity_message(&self, _message: Bytes) { | ||||
| 		//TODO: handle message here
 | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| impl MiningBlockChainClient for Client { | ||||
|  | ||||
| @ -554,6 +554,10 @@ impl BlockChainClient for TestBlockChainClient { | ||||
| 		self.miner.import_external_transactions(self, txs); | ||||
| 	} | ||||
| 
 | ||||
| 	fn queue_infinity_message(&self, _packet: Bytes) { | ||||
| 		unimplemented!(); | ||||
| 	} | ||||
| 
 | ||||
| 	fn pending_transactions(&self) -> Vec<SignedTransaction> { | ||||
| 		self.miner.pending_transactions() | ||||
| 	} | ||||
|  | ||||
| @ -182,6 +182,9 @@ pub trait BlockChainClient : Sync + Send { | ||||
| 	/// Queue transactions for importing.
 | ||||
| 	fn queue_transactions(&self, transactions: Vec<Bytes>); | ||||
| 
 | ||||
| 	/// Queue packet
 | ||||
| 	fn queue_infinity_message(&self, packet: Bytes); | ||||
| 
 | ||||
| 	/// list all transactions
 | ||||
| 	fn pending_transactions(&self) -> Vec<SignedTransaction>; | ||||
| 
 | ||||
|  | ||||
| @ -23,6 +23,7 @@ use ethcore::client::{BlockChainClient, ChainNotify}; | ||||
| use ethcore::header::BlockNumber; | ||||
| use sync_io::NetSyncIo; | ||||
| use chain::{ChainSync, SyncStatus}; | ||||
| use infinity::{InfinitySync}; | ||||
| use std::net::{SocketAddr, AddrParseError}; | ||||
| use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig}; | ||||
| use std::str::FromStr; | ||||
| @ -30,6 +31,8 @@ use parking_lot::RwLock; | ||||
| 
 | ||||
| /// Ethereum sync protocol
 | ||||
| pub const ETH_PROTOCOL: &'static str = "eth"; | ||||
| /// Infinity protocol
 | ||||
| pub const INF_PROTOCOL: &'static str = "inf"; | ||||
| 
 | ||||
| /// Sync configuration
 | ||||
| #[derive(Debug, Clone)] | ||||
| @ -65,18 +68,22 @@ pub trait SyncProvider: Send + Sync { | ||||
| pub struct EthSync { | ||||
| 	/// Network service
 | ||||
| 	network: NetworkService, | ||||
| 	/// Protocol handler
 | ||||
| 	handler: Arc<SyncProtocolHandler>, | ||||
| 	/// Ethereum Protocol handler
 | ||||
| 	eth_handler: Arc<SyncProtocolHandler>, | ||||
| 	/// Infinity Protocol handler
 | ||||
| 	inf_handler: Arc<InfProtocolHandler>, | ||||
| } | ||||
| 
 | ||||
| impl EthSync { | ||||
| 	/// Creates and register protocol with the network service
 | ||||
| 	pub fn new(config: SyncConfig, chain: Arc<BlockChainClient>, network_config: NetworkConfiguration) -> Result<Arc<EthSync>, NetworkError> { | ||||
| 		let inf_sync = InfinitySync::new(&config, chain.clone()); | ||||
| 		let chain_sync = ChainSync::new(config, &*chain); | ||||
| 		let service = try!(NetworkService::new(try!(network_config.into_basic()))); | ||||
| 		let sync = Arc::new(EthSync{ | ||||
| 			network: service, | ||||
| 			handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain }), | ||||
| 			eth_handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain.clone() }), | ||||
| 			inf_handler: Arc::new(InfProtocolHandler { sync: RwLock::new(inf_sync), chain: chain }), | ||||
| 		}); | ||||
| 
 | ||||
| 		Ok(sync) | ||||
| @ -88,12 +95,12 @@ impl EthSync { | ||||
| impl SyncProvider for EthSync { | ||||
| 	/// Get sync status
 | ||||
| 	fn status(&self) -> SyncStatus { | ||||
| 		self.handler.sync.write().status() | ||||
| 		self.eth_handler.sync.write().status() | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| struct SyncProtocolHandler { | ||||
| 	/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
 | ||||
| 	/// Shared blockchain client.
 | ||||
| 	chain: Arc<BlockChainClient>, | ||||
| 	/// Sync strategy
 | ||||
| 	sync: RwLock<ChainSync>, | ||||
| @ -122,6 +129,33 @@ impl NetworkProtocolHandler for SyncProtocolHandler { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| struct InfProtocolHandler { | ||||
| 	/// Shared blockchain client.
 | ||||
| 	chain: Arc<BlockChainClient>, | ||||
| 	/// Sync strategy
 | ||||
| 	sync: RwLock<InfinitySync>, | ||||
| } | ||||
| 
 | ||||
| impl NetworkProtocolHandler for InfProtocolHandler { | ||||
| 	fn initialize(&self, _io: &NetworkContext) { | ||||
| 	} | ||||
| 
 | ||||
| 	fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { | ||||
| 		InfinitySync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, &*self.chain), *peer, packet_id, data); | ||||
| 	} | ||||
| 
 | ||||
| 	fn connected(&self, io: &NetworkContext, peer: &PeerId) { | ||||
| 		self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, &*self.chain), *peer); | ||||
| 	} | ||||
| 
 | ||||
| 	fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { | ||||
| 		self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, &*self.chain), *peer); | ||||
| 	} | ||||
| 
 | ||||
| 	fn timeout(&self, _io: &NetworkContext, _timer: TimerToken) { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| impl ChainNotify for EthSync { | ||||
| 	fn new_blocks(&self, | ||||
| 		imported: Vec<H256>, | ||||
| @ -132,8 +166,8 @@ impl ChainNotify for EthSync { | ||||
| 		_duration: u64) | ||||
| 	{ | ||||
| 		self.network.with_context(ETH_PROTOCOL, |context| { | ||||
| 			let mut sync_io = NetSyncIo::new(context, &*self.handler.chain); | ||||
| 			self.handler.sync.write().chain_new_blocks( | ||||
| 			let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain); | ||||
| 			self.eth_handler.sync.write().chain_new_blocks( | ||||
| 				&mut sync_io, | ||||
| 				&imported, | ||||
| 				&invalid, | ||||
| @ -145,13 +179,22 @@ impl ChainNotify for EthSync { | ||||
| 
 | ||||
| 	fn start(&self) { | ||||
| 		self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e)); | ||||
| 		self.network.register_protocol(self.handler.clone(), ETH_PROTOCOL, &[62u8, 63u8]) | ||||
| 		self.network.register_protocol(self.eth_handler.clone(), ETH_PROTOCOL, &[62u8, 63u8]) | ||||
| 			.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); | ||||
| 		self.network.register_protocol(self.inf_handler.clone(), INF_PROTOCOL, &[1u8]) | ||||
| 			.unwrap_or_else(|e| warn!("Error registering infinity protocol: {:?}", e)); | ||||
| 	} | ||||
| 
 | ||||
| 	fn stop(&self) { | ||||
| 		self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); | ||||
| 	} | ||||
| 
 | ||||
| 	fn broadcast(&self, message: Vec<u8>) { | ||||
| 		self.network.with_context(ETH_PROTOCOL, |context| { | ||||
| 			let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain); | ||||
| 			self.inf_handler.sync.write().propagate_packet(&mut sync_io, message.clone()); | ||||
| 		}); | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| impl IpcConfig for ManageNetwork { } | ||||
| @ -201,8 +244,8 @@ impl ManageNetwork for EthSync { | ||||
| 
 | ||||
| 	fn stop_network(&self) { | ||||
| 		self.network.with_context(ETH_PROTOCOL, |context| { | ||||
| 			let mut sync_io = NetSyncIo::new(context, &*self.handler.chain); | ||||
| 			self.handler.sync.write().abort(&mut sync_io); | ||||
| 			let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain); | ||||
| 			self.eth_handler.sync.write().abort(&mut sync_io); | ||||
| 		}); | ||||
| 		self.stop(); | ||||
| 	} | ||||
|  | ||||
							
								
								
									
										190
									
								
								sync/src/infinity.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										190
									
								
								sync/src/infinity.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,190 @@ | ||||
| // Copyright 2015, 2016 Ethcore (UK) Ltd.
 | ||||
| // This file is part of Parity.
 | ||||
| 
 | ||||
| // Parity is free software: you can redistribute it and/or modify
 | ||||
| // it under the terms of the GNU General Public License as published by
 | ||||
| // the Free Software Foundation, either version 3 of the License, or
 | ||||
| // (at your option) any later version.
 | ||||
| 
 | ||||
| // Parity is distributed in the hope that it will be useful,
 | ||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | ||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | ||||
| // GNU General Public License for more details.
 | ||||
| 
 | ||||
| // You should have received a copy of the GNU General Public License
 | ||||
| // along with Parity.  If not, see <http://www.gnu.org/licenses/>.
 | ||||
| 
 | ||||
| /// Infinity networking
 | ||||
| 
 | ||||
| use util::*; | ||||
| use network::*; | ||||
| use ethcore::client::{BlockChainClient}; | ||||
| use sync_io::SyncIo; | ||||
| use super::SyncConfig; | ||||
| 
 | ||||
| known_heap_size!(0, PeerInfo); | ||||
| 
 | ||||
| type PacketDecodeError = DecoderError; | ||||
| 
 | ||||
| const PROTOCOL_VERSION: u8 = 1u8; | ||||
| 
 | ||||
| const STATUS_PACKET: u8 = 0x00; | ||||
| const GENERIC_PACKET: u8 = 0x01; | ||||
| 
 | ||||
| /// Syncing status and statistics
 | ||||
| #[derive(Clone)] | ||||
| pub struct NetworkStatus { | ||||
| 	pub protocol_version: u8, | ||||
| 	/// The underlying p2p network version.
 | ||||
| 	pub network_id: U256, | ||||
| 	/// Total number of connected peers
 | ||||
| 	pub num_peers: usize, | ||||
| 	/// Total number of active peers
 | ||||
| 	pub num_active_peers: usize, | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| /// Inf peer information
 | ||||
| struct PeerInfo { | ||||
| 	/// inf protocol version
 | ||||
| 	protocol_version: u32, | ||||
| 	/// Peer chain genesis hash
 | ||||
| 	genesis: H256, | ||||
| 	/// Peer network id
 | ||||
| 	network_id: U256, | ||||
| } | ||||
| 
 | ||||
| /// Infinity protocol handler.
 | ||||
| pub struct InfinitySync { | ||||
| 	chain: Arc<BlockChainClient>, | ||||
| 	/// All connected peers
 | ||||
| 	peers: HashMap<PeerId, PeerInfo>, | ||||
| 	/// Network ID
 | ||||
| 	network_id: U256, | ||||
| } | ||||
| 
 | ||||
| impl InfinitySync { | ||||
| 	/// Create a new instance of syncing strategy.
 | ||||
| 	pub fn new(config: &SyncConfig, chain: Arc<BlockChainClient>) -> InfinitySync { | ||||
| 		let mut sync = InfinitySync { | ||||
| 			chain: chain, | ||||
| 			peers: HashMap::new(), | ||||
| 			network_id: config.network_id, | ||||
| 		}; | ||||
| 		sync.reset(); | ||||
| 		sync | ||||
| 	} | ||||
| 
 | ||||
| 	/// @returns Synchonization status
 | ||||
| 	pub fn _status(&self) -> NetworkStatus { | ||||
| 		NetworkStatus { | ||||
| 			protocol_version: 1, | ||||
| 			network_id: self.network_id, | ||||
| 			num_peers: self.peers.len(), | ||||
| 			num_active_peers: 0, | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	#[cfg_attr(feature="dev", allow(for_kv_map))] // Because it's not possible to get `values_mut()`
 | ||||
| 	/// Reset sync. Clear all downloaded data but keep the queue
 | ||||
| 	fn reset(&mut self) { | ||||
| 	} | ||||
| 
 | ||||
| 	/// Called by peer to report status
 | ||||
| 	fn on_peer_status(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { | ||||
| 		let peer = PeerInfo { | ||||
| 			protocol_version: try!(r.val_at(0)), | ||||
| 			network_id: try!(r.val_at(1)), | ||||
| 			genesis: try!(r.val_at(2)), | ||||
| 		}; | ||||
| 		trace!(target: "inf", "New peer {} (protocol: {}, network: {:?}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.genesis); | ||||
| 		if self.peers.contains_key(&peer_id) { | ||||
| 			debug!(target: "inf", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id)); | ||||
| 			return Ok(()); | ||||
| 		} | ||||
| 		let chain_info = io.chain().chain_info(); | ||||
| 		if peer.genesis != chain_info.genesis_hash { | ||||
| 			io.disable_peer(peer_id); | ||||
| 			trace!(target: "inf", "Peer {} genesis hash mismatch (ours: {}, theirs: {})", peer_id, chain_info.genesis_hash, peer.genesis); | ||||
| 			return Ok(()); | ||||
| 		} | ||||
| 		if peer.network_id != self.network_id { | ||||
| 			io.disable_peer(peer_id); | ||||
| 			trace!(target: "inf", "Peer {} network id mismatch (ours: {}, theirs: {})", peer_id, self.network_id, peer.network_id); | ||||
| 			return Ok(()); | ||||
| 		} | ||||
| 
 | ||||
| 		self.peers.insert(peer_id.clone(), peer); | ||||
| 		Ok(()) | ||||
| 	} | ||||
| 
 | ||||
| 	/// Called when a new peer is connected
 | ||||
| 	pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) { | ||||
| 		trace!(target: "inf", "== Connected {}: {}", peer, io.peer_info(peer)); | ||||
| 		if let Err(e) = self.send_status(io) { | ||||
| 			debug!(target:"inf", "Error sending status request: {:?}", e); | ||||
| 			io.disable_peer(peer); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	/// Generic packet sender
 | ||||
| 	fn send_packet(&mut self, sync: &mut SyncIo, peer_id: PeerId, packet_id: PacketId, packet: Bytes) { | ||||
| 		if self.peers.contains_key(&peer_id) { | ||||
| 			if let Err(e) = sync.send(peer_id, packet_id, packet) { | ||||
| 				debug!(target:"inf", "Error sending request: {:?}", e); | ||||
| 				sync.disable_peer(peer_id); | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	/// Called when peer sends us new transactions
 | ||||
| 	fn on_peer_packet(&mut self, _io: &mut SyncIo, _peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { | ||||
| 		self.chain.queue_infinity_message(r.as_raw().to_vec()); | ||||
| 		Ok(()) | ||||
| 	} | ||||
| 
 | ||||
| 	/// Called by peer when it is disconnecting
 | ||||
| 	pub fn on_peer_aborting(&mut self, io: &mut SyncIo, peer: PeerId) { | ||||
| 		trace!(target: "inf", "== Disconnecting {}: {}", peer, io.peer_info(peer)); | ||||
| 		if self.peers.contains_key(&peer) { | ||||
| 			debug!(target: "inf", "Disconnected {}", peer); | ||||
| 			self.peers.remove(&peer); | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	/// Send Status message
 | ||||
| 	fn send_status(&mut self, io: &mut SyncIo) -> Result<(), NetworkError> { | ||||
| 		let mut packet = RlpStream::new_list(5); | ||||
| 		let chain = io.chain().chain_info(); | ||||
| 		packet.append(&(PROTOCOL_VERSION as u32)); | ||||
| 		packet.append(&self.network_id); | ||||
| 		packet.append(&chain.total_difficulty); | ||||
| 		packet.append(&chain.best_block_hash); | ||||
| 		packet.append(&chain.genesis_hash); | ||||
| 		io.respond(STATUS_PACKET, packet.out()) | ||||
| 	} | ||||
| 
 | ||||
| 	pub fn dispatch_packet(sync: &RwLock<InfinitySync>, io: &mut SyncIo, peer: PeerId, packet_id: u8, data: &[u8]) { | ||||
| 		let rlp = UntrustedRlp::new(data); | ||||
| 		match packet_id { | ||||
| 			STATUS_PACKET => sync.write().on_peer_status(io, peer, &rlp).unwrap_or_else( | ||||
| 				|e| trace!(target: "inf", "Error processing packet: {:?}", e)), | ||||
| 			GENERIC_PACKET => sync.write().on_peer_packet(io, peer, &rlp).unwrap_or_else( | ||||
| 				|e| warn!(target: "inf", "Error queueing packet: {:?}", e)), | ||||
| 			p @ _ => trace!(target: "inf", "Unexpected packet {} from {}", p, peer), | ||||
| 		}; | ||||
| 	} | ||||
| 
 | ||||
| 	pub fn propagate_packet(&mut self, io: &mut SyncIo, packet: Bytes) { | ||||
| 		let lucky_peers: Vec<_> = self.peers.keys().cloned().collect(); | ||||
| 		trace!(target: "inf", "Sending packets to {:?}", lucky_peers); | ||||
| 		for peer_id in lucky_peers { | ||||
| 			self.send_packet(io, peer_id, GENERIC_PACKET, packet.clone()); | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| #[cfg(test)] | ||||
| mod tests { | ||||
| } | ||||
| 
 | ||||
| @ -82,6 +82,7 @@ extern crate parking_lot; | ||||
| mod chain; | ||||
| mod blocks; | ||||
| mod sync_io; | ||||
| mod infinity; | ||||
| 
 | ||||
| #[cfg(test)] | ||||
| mod tests; | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user