diff --git a/src/bin/client.rs b/src/bin/client.rs index 6809a8229..97e587989 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -5,21 +5,20 @@ extern crate rustc_serialize; use std::io::*; use std::env; use std::sync::Arc; -use rustc_serialize::hex::FromHex; use util::hash::*; use util::network::{NetworkService}; use ethcore::client::Client; use ethcore::sync::EthSync; +use ethcore::spec::Spec; fn main() { let mut service = NetworkService::start().unwrap(); //TODO: replace with proper genesis and chain params. - let genesis = "f901fcf901f7a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347948888f1f195afa192cfee860698584c030f4c9db1a07dba07d6b448a186e9612e5f737d1c909dce473e53199901a302c00646d523c1a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008302000080832fefd8808454c98c8142a059262c330941f3fe2a34d16d6e3c7b30d2ceb37c6a0e9a994c494ee1a61d2410885aa4c8bf8e56e264c0c0".from_hex().unwrap(); + let frontier = Spec::new_frontier(); let mut dir = env::temp_dir(); dir.push(H32::random().hex()); - let client = Arc::new(Client::new(&genesis, &dir)); - let sync = Box::new(EthSync::new(client)); - service.register_protocol(sync, "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); + let client = Arc::new(Client::new(&frontier.genesis_block(), &dir)); + EthSync::register(&mut service, client); loop { let mut cmd = String::new(); stdin().read_line(&mut cmd).unwrap(); diff --git a/src/sync/chain.rs b/src/sync/chain.rs index 5d0798c80..b7d550c9b 100644 --- a/src/sync/chain.rs +++ b/src/sync/chain.rs @@ -1,3 +1,18 @@ +/// +/// BlockChain synchronization strategy. +/// Syncs to peers and keeps up to date. +/// This implementation uses ethereum protocol v63 +/// +/// Syncing strategy. +/// +/// 1. A peer arrives with a total difficulty better than ours +/// 2. Find a common best block between our an peer chain. +/// Start with out best block and request headers from peer backwards until a common block is found +/// 3. Download headers and block bodies from peers in parallel. +/// As soon as a set of the blocks is fully downloaded at the head of the queue it is fed to the blockchain +/// 4. Maintain sync by handling NewBlocks/NewHashes messages +/// + use std::collections::{HashSet, HashMap}; use std::cmp::{min, max}; use std::mem::{replace}; @@ -13,7 +28,7 @@ use client::{BlockNumber, BlockChainClient, BlockStatus, QueueStatus, ImportResu use views::{HeaderView}; use header::{Header as BlockHeader}; use sync::range_collection::{RangeCollection, ToUsize, FromUsize}; -use sync::{SyncIo}; +use sync::io::SyncIo; impl ToUsize for BlockNumber { fn to_usize(&self) -> usize { @@ -52,7 +67,7 @@ const GET_RECEIPTS_PACKET: u8 = 0x0f; const RECEIPTS_PACKET: u8 = 0x10; struct Header { - ///Header data + /// Header data data: Bytes, /// Block hash hash: H256, @@ -81,13 +96,21 @@ pub enum SyncState { NewBlocks, } +/// Syncing status and statistics pub struct SyncStatus { + /// State pub state: SyncState, + /// Syncing protocol version. That's the maximum protocol version we connect to. pub protocol_version: u8, + /// BlockChain height for the moment the sync started. pub start_block_number: BlockNumber, + /// Last fully downloaded and imported block number. pub last_imported_block_number: BlockNumber, + /// Highest block number in the download queue. pub highest_block_number: BlockNumber, + /// Total number of blocks for the sync process. pub blocks_total: usize, + /// Number of blocks downloaded so far. pub blocks_received: usize, } @@ -178,11 +201,7 @@ impl ChainSync { self.peers.clear(); } - /// @returns true is Sync is in progress - pub fn is_syncing(&self) -> bool { - self.state != SyncState::Idle - } - + /// Rest sync. Clear all downloaded data but keep the queue fn reset(&mut self) { self.downloading_headers.clear(); self.downloading_bodies.clear(); @@ -389,6 +408,7 @@ impl ChainSync { Ok(()) } + /// Handles NewHashes packet. Initiates headers download for any unknown hashes. fn on_peer_new_hashes(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { if self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer").asking != PeerAsking::Nothing { trace!(target: "sync", "Ignoring new hashes since we're already downloading."); @@ -432,6 +452,7 @@ impl ChainSync { self.continue_sync(io); } + /// Called when a new peer is connected pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: &PeerId) { trace!(target: "sync", "== Connected {}", peer); self.send_status(io, peer); @@ -459,6 +480,7 @@ impl ChainSync { self.state = SyncState::Waiting; } + /// Find something to do for a peer. Called for a new peer or when a peer is done with it's task. fn sync_peer(&mut self, io: &mut SyncIo, peer_id: &PeerId, force: bool) { let (peer_latest, peer_difficulty) = { let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); @@ -488,6 +510,7 @@ impl ChainSync { } } + /// Find some headers or blocks to download for a peer. fn request_blocks(&mut self, io: &mut SyncIo, peer_id: &PeerId) { self.clear_peer_download(peer_id); @@ -572,6 +595,7 @@ impl ChainSync { } } + /// Clear all blocks/headers marked as being downloaded by a peer. fn clear_peer_download(&mut self, peer_id: &PeerId) { let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); for b in &peer.asking_blocks { @@ -581,6 +605,7 @@ impl ChainSync { peer.asking_blocks.clear(); } + /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. fn collect_blocks(&mut self, io: &mut SyncIo) { if !self.have_common_block || self.headers.is_empty() || self.bodies.is_empty() { return; @@ -647,6 +672,8 @@ impl ChainSync { } } + /// Remove downloaded bocks/headers starting from specified number. + /// Used to recover from an error and re-download parts of the chain detected as bad. fn remove_downloaded_blocks(&mut self, start: BlockNumber) { for n in self.headers.get_tail(&start) { match self.headers.find_item(&n) { @@ -667,8 +694,9 @@ impl ChainSync { self.bodies.remove_tail(&start); } + /// Request headers from a peer by block hash fn request_headers_by_hash(&mut self, sync: &mut SyncIo, peer_id: &PeerId, h: &H256, count: usize, skip: usize, reverse: bool) { - trace!(target: "sync", "{}<- GetBlockHeaders: {} entries starting from {}", peer_id, count, h); + trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, h); let mut rlp = RlpStream::new_list(4); rlp.append(h); rlp.append(&count); @@ -677,9 +705,10 @@ impl ChainSync { self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); } + /// Request headers from a peer by block number fn request_headers_by_number(&mut self, sync: &mut SyncIo, peer_id: &PeerId, n: BlockNumber, count: usize, skip: usize, reverse: bool) { let mut rlp = RlpStream::new_list(4); - trace!(target: "sync", "{}<- GetBlockHeaders: {} entries starting from {}", peer_id, count, n); + trace!(target: "sync", "{} <- GetBlockHeaders: {} entries starting from {}", peer_id, count, n); rlp.append(&n); rlp.append(&count); rlp.append(&skip); @@ -687,15 +716,17 @@ impl ChainSync { self.send_request(sync, peer_id, PeerAsking::BlockHeaders, GET_BLOCK_HEADERS_PACKET, rlp.out()); } + /// Request block bodies from a peer fn request_bodies(&mut self, sync: &mut SyncIo, peer_id: &PeerId, hashes: Vec) { let mut rlp = RlpStream::new_list(hashes.len()); - trace!(target: "sync", "{}<- GetBlockBodies: {} entries", peer_id, hashes.len()); + trace!(target: "sync", "{} <- GetBlockBodies: {} entries", peer_id, hashes.len()); for h in hashes { rlp.append(&h); } self.send_request(sync, peer_id, PeerAsking::BlockBodies, GET_BLOCK_BODIES_PACKET, rlp.out()); } + /// Reset peer status after request is complete. fn reset_peer_asking(&mut self, peer_id: &PeerId, asking: PeerAsking) { let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); if peer.asking != asking { @@ -706,6 +737,7 @@ impl ChainSync { } } + /// Generic request sender fn send_request(&mut self, sync: &mut SyncIo, peer_id: &PeerId, asking: PeerAsking, packet_id: PacketId, packet: Bytes) { { let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); @@ -726,10 +758,12 @@ impl ChainSync { } } + /// Called when peer sends us new transactions fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: &PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { Ok(()) } + /// Send Status message fn send_status(&mut self, io: &mut SyncIo, peer_id: &PeerId) { let mut packet = RlpStream::new_list(5); let chain = io.chain().chain_info(); @@ -748,6 +782,7 @@ impl ChainSync { } } + /// Respond to GetBlockHeaders request fn return_block_headers(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { // Packet layout: // [ block: { P , B_32 }, maxHeaders: P, skip: P, reverse: P in { 0 , 1 } ] @@ -804,6 +839,7 @@ impl ChainSync { Ok(()) } + /// Respond to GetBlockBodies request fn return_block_bodies(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { let mut count = r.item_count(); if count == 0 { @@ -831,6 +867,7 @@ impl ChainSync { Ok(()) } + /// Respond to GetNodeData request fn return_node_data(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { let mut count = r.item_count(); if count == 0 { @@ -856,6 +893,7 @@ impl ChainSync { Ok(()) } + /// Respond to GetReceipts request fn return_receipts(&self, io: &mut SyncIo, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { let mut count = r.item_count(); if count == 0 { @@ -881,6 +919,7 @@ impl ChainSync { Ok(()) } + /// Dispatch incoming requests and responses pub fn on_packet(&mut self, io: &mut SyncIo, peer: &PeerId, packet_id: u8, data: &[u8]) { let rlp = UntrustedRlp::new(data); let result = match packet_id { @@ -904,6 +943,7 @@ impl ChainSync { }) } + /// Maintain other peers. Send out any new blocks and transactions pub fn maintain_sync(&mut self, _io: &mut SyncIo) { } } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index fc3553180..18f2288b1 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,88 +1,73 @@ +/// Blockchain sync module +/// Implements ethereum protocol version 63 as specified here: +/// https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol +/// +/// Usage example: +/// +/// ```rust +/// extern crate ethcore_util as util; +/// extern crate ethcore; +/// use std::env; +/// use std::sync::Arc; +/// use util::network::NetworkService; +/// use ethcore::client::Client; +/// use ethcore::sync::EthSync; +/// use ethcore::spec::Spec; +/// +/// fn main() { +/// let mut service = NetworkService::start().unwrap(); +/// let frontier = Spec::new_frontier(); +/// let dir = env::temp_dir(); +/// let client = Arc::new(Client::new(&frontier.genesis_block(), &dir)); +/// EthSync::register(&mut service, client); +/// } +/// ``` + use std::sync::Arc; use client::BlockChainClient; -use util::network::{ProtocolHandler, NetworkService, HandlerIo, TimerToken, PeerId, PacketId, Message, Error as NetworkError}; +use util::network::{ProtocolHandler, NetworkService, HandlerIo, TimerToken, PeerId, Message}; use sync::chain::ChainSync; +use sync::io::NetSyncIo; mod chain; +mod io; mod range_collection; #[cfg(test)] mod tests; -pub fn new(_service: &mut NetworkService, eth_client: Arc) -> EthSync { - EthSync { - chain: eth_client, - sync: ChainSync::new(), - } -} - -pub trait SyncIo { - fn disable_peer(&mut self, peer_id: &PeerId); - fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), NetworkError>; - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), NetworkError>; - fn chain<'s>(&'s mut self) -> &'s mut BlockChainClient; -} - -pub struct NetSyncIo<'s, 'h> where 'h:'s { - network: &'s mut HandlerIo<'h>, - chain: &'s mut BlockChainClient -} - -impl<'s, 'h> NetSyncIo<'s, 'h> { - pub fn new(network: &'s mut HandlerIo<'h>, chain: &'s mut BlockChainClient) -> NetSyncIo<'s,'h> { - NetSyncIo { - network: network, - chain: chain, - } - } -} - -impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> { - fn disable_peer(&mut self, peer_id: &PeerId) { - self.network.disable_peer(*peer_id); - } - - fn respond(&mut self, packet_id: PacketId, data: Vec) -> Result<(), NetworkError>{ - self.network.respond(packet_id, data) - } - - fn send(&mut self, peer_id: PeerId, packet_id: PacketId, data: Vec) -> Result<(), NetworkError>{ - self.network.send(peer_id, packet_id, data) - } - - fn chain<'a>(&'a mut self) -> &'a mut BlockChainClient { - self.chain - } -} - +/// Ethereum network protocol handler pub struct EthSync { + /// Shared blockchain client. TODO: this should evetually become an IPC endpoint chain: Arc, + /// Sync strategy sync: ChainSync } pub use self::chain::SyncStatus; impl EthSync { - pub fn new(chain: Arc) -> EthSync { - EthSync { + /// Creates and register protocol with the network service + pub fn register(service: &mut NetworkService, chain: Arc) { + let sync = Box::new(EthSync { chain: chain, sync: ChainSync::new(), - } - } - - pub fn is_syncing(&self) -> bool { - self.sync.is_syncing() + }); + service.register_protocol(sync, "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); } + /// Get sync status pub fn status(&self) -> SyncStatus { self.sync.status() } - pub fn stop_network(&mut self, io: &mut HandlerIo) { + /// Stop sync + pub fn stop(&mut self, io: &mut HandlerIo) { self.sync.abort(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); } - pub fn start_network(&mut self, io: &mut HandlerIo) { + /// Restart sync + pub fn restart(&mut self, io: &mut HandlerIo) { self.sync.restart(&mut NetSyncIo::new(io, Arc::get_mut(&mut self.chain).unwrap())); } } diff --git a/src/sync/range_collection.rs b/src/sync/range_collection.rs index 822e88d75..d212625be 100644 --- a/src/sync/range_collection.rs +++ b/src/sync/range_collection.rs @@ -1,3 +1,6 @@ +/// This module defines a trait for a collection of ranged values and an implementation +/// for this trait over sorted vector. + use std::ops::{Add, Sub, Range}; pub trait ToUsize { @@ -8,16 +11,28 @@ pub trait FromUsize { fn from_usize(s: usize) -> Self; } +/// A key-value collection orderd by key with sequential key-value pairs grouped together. +/// Such group is called a range. +/// E.g. a set of collection of 5 pairs {1, a}, {2, b}, {10, x}, {11, y}, {12, z} will be grouped into two ranges: {1, [a,b]}, {10, [x,y,z]} pub trait RangeCollection { + /// Check if the given key is present in the collection. fn have_item(&self, key: &K) -> bool; + /// Get value by key. fn find_item(&self, key: &K) -> Option<&V>; + /// Get a range of keys from `key` till the end of the range that has `key` + /// Returns an empty range is key does not exist. fn get_tail(&mut self, key: &K) -> Range; + /// Remove all elements < `start` in the range that contains `start` - 1 fn remove_head(&mut self, start: &K); + /// Remove all elements >= `start` in the range that contains `start` fn remove_tail(&mut self, start: &K); + /// Remove all elements >= `tail` fn insert_item(&mut self, key: K, value: V); + /// Get an iterator over ranges fn range_iter<'c>(&'c self) -> RangeIterator<'c, K, V>; } +/// Range iterator. For each range yelds a key for the first element of the range and a vector of values. pub struct RangeIterator<'c, K:'c, V:'c> { range: usize, collection: &'c Vec<(K, Vec)> @@ -72,7 +87,6 @@ impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + } } - /// Get a range of elements from start till the end of the range fn get_tail(&mut self, key: &K) -> Range { let kv = *key; match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { diff --git a/src/sync/tests.rs b/src/sync/tests.rs index 79e1443c9..28e526aa9 100644 --- a/src/sync/tests.rs +++ b/src/sync/tests.rs @@ -7,8 +7,8 @@ use util::rlp::{self, Rlp, RlpStream, View, Stream}; use util::network::{PeerId, PacketId, Error as NetworkError}; use client::{BlockChainClient, BlockStatus, BlockNumber, TreeRoute, BlockQueueStatus, BlockChainInfo, ImportResult, QueueStatus}; use header::Header as BlockHeader; -use sync::{SyncIo}; -use sync::chain::{ChainSync}; +use sync::io::SyncIo; +use sync::chain::ChainSync; struct TestBlockChainClient { blocks: HashMap,