diff --git a/bin/Cargo.toml b/bin/Cargo.toml index 7174ada14..8fc233796 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -15,6 +15,7 @@ ctrlc = "1.0" ethcore-util = { path = "../util" } ethcore-rpc = { path = "../rpc", optional = true } ethcore = { path = ".." } +ethsync = { path = "../sync" } clippy = "0.0.37" [features] diff --git a/bin/src/main.rs b/bin/src/main.rs index 3d4199fcf..92f0cbf20 100644 --- a/bin/src/main.rs +++ b/bin/src/main.rs @@ -8,6 +8,7 @@ extern crate docopt; extern crate rustc_serialize; extern crate ethcore_util as util; extern crate ethcore; +extern crate ethsync; extern crate log; extern crate env_logger; extern crate ctrlc; @@ -24,7 +25,7 @@ use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::ethereum; use ethcore::blockchain::CacheSize; -use ethcore::sync::EthSync; +use ethsync::EthSync; docopt!(Args derive Debug, " Parity. Ethereum Client. @@ -81,8 +82,10 @@ fn main() { let mut net_settings = NetworkConfiguration::new(); net_settings.boot_nodes = init_nodes; let mut service = ClientService::start(spec, net_settings).unwrap(); - setup_rpc_server(service.client(), service.sync()); - let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: service.sync() }); + let client = service.client().clone(); + let sync = EthSync::register(service.network(), client); + setup_rpc_server(service.client(), sync.clone()); + let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: sync }); service.io().register_handler(io_handler).expect("Error registering IO handler"); let exit = Arc::new(Condvar::new()); diff --git a/src/lib.rs b/src/lib.rs index e084635dd..0652d964e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,8 +151,6 @@ mod tests; /// TODO [arkpar] Please document me pub mod client; /// TODO [arkpar] Please document me -pub mod sync; -/// TODO [arkpar] Please document me pub mod block; /// TODO [arkpar] Please document me pub mod verification; diff --git a/src/service.rs b/src/service.rs index 8c900d20a..a56ed7f44 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,5 +1,4 @@ use util::*; -use sync::*; use spec::Spec; use error::*; use std::env; @@ -21,7 +20,6 @@ pub type NetSyncMessage = NetworkIoMessage; pub struct ClientService { net_service: NetworkService, client: Arc, - sync: Arc, } impl ClientService { @@ -34,7 +32,6 @@ impl ClientService { dir.push(".parity"); dir.push(H64::from(spec.genesis_header().hash()).hex()); let client = try!(Client::new(spec, &dir, net_service.io().channel())); - let sync = EthSync::register(&mut net_service, client.clone()); let client_io = Arc::new(ClientIoHandler { client: client.clone() }); @@ -43,29 +40,28 @@ impl ClientService { Ok(ClientService { net_service: net_service, client: client, - sync: sync, }) } - /// Get the network service. + /// Add a node to network pub fn add_node(&mut self, _enode: &str) { unimplemented!(); } - /// TODO [arkpar] Please document me + /// Get general IO interface pub fn io(&mut self) -> &mut IoService { self.net_service.io() } - /// TODO [arkpar] Please document me + /// Get client interface pub fn client(&self) -> Arc { self.client.clone() } - - /// Get shared sync handler - pub fn sync(&self) -> Arc { - self.sync.clone() + + /// Get network service component + pub fn network(&mut self) -> &mut NetworkService { + &mut self.net_service } } diff --git a/sync/Cargo.toml b/sync/Cargo.toml new file mode 100644 index 000000000..c3ae470fd --- /dev/null +++ b/sync/Cargo.toml @@ -0,0 +1,16 @@ +[package] +description = "Ethcore blockchain sync" +name = "ethsync" +version = "0.1.0" +license = "GPL-3.0" +authors = ["Ethcore usize { @@ -100,14 +100,14 @@ pub struct SyncStatus { pub protocol_version: u8, /// BlockChain height for the moment the sync started. pub start_block_number: BlockNumber, - /// Last fully downloaded and imported block number (if any). - pub last_imported_block_number: Option, - /// Highest block number in the download queue (if any). - pub highest_block_number: Option, + /// Last fully downloaded and imported block number. + pub last_imported_block_number: BlockNumber, + /// Highest block number in the download queue. + pub highest_block_number: BlockNumber, /// Total number of blocks for the sync process. - pub blocks_total: BlockNumber, + pub blocks_total: usize, /// Number of blocks downloaded so far. - pub blocks_received: BlockNumber, + pub blocks_received: usize, /// Total number of connected peers pub num_peers: usize, /// Total number of active peers @@ -148,7 +148,7 @@ pub struct ChainSync { /// Last block number for the start of sync starting_block: BlockNumber, /// Highest block number seen - highest_block: Option, + highest_block: BlockNumber, /// Set of block header numbers being downloaded downloading_headers: HashSet, /// Set of block body numbers being downloaded @@ -162,9 +162,9 @@ pub struct ChainSync { /// Used to map body to header header_ids: HashMap, /// Last impoted block number - last_imported_block: Option, + last_imported_block: BlockNumber, /// Last impoted block hash - last_imported_hash: Option, + last_imported_hash: H256, /// Syncing total difficulty syncing_difficulty: U256, /// True if common block for our and remote chain has been found @@ -178,15 +178,15 @@ impl ChainSync { ChainSync { state: SyncState::NotSynced, starting_block: 0, - highest_block: None, + highest_block: 0, downloading_headers: HashSet::new(), downloading_bodies: HashSet::new(), headers: Vec::new(), bodies: Vec::new(), peers: HashMap::new(), header_ids: HashMap::new(), - last_imported_block: None, - last_imported_hash: None, + last_imported_block: 0, + last_imported_hash: H256::new(), syncing_difficulty: U256::from(0u64), have_common_block: false, } @@ -200,8 +200,8 @@ impl ChainSync { start_block_number: self.starting_block, last_imported_block_number: self.last_imported_block, highest_block_number: self.highest_block, - blocks_received: match self.last_imported_block { None => 0, Some(x) => x - self.starting_block }, - blocks_total: match self.highest_block { None => 0, Some(x) => x - self.starting_block }, + blocks_received: (self.last_imported_block - self.starting_block) as usize, + blocks_total: (self.highest_block - self.starting_block) as usize, num_peers: self.peers.len(), num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), } @@ -230,10 +230,10 @@ impl ChainSync { /// Restart sync pub fn restart(&mut self, io: &mut SyncIo) { self.reset(); - self.last_imported_block = None; - self.last_imported_hash = None; + self.last_imported_block = 0; + self.last_imported_hash = H256::new(); self.starting_block = 0; - self.highest_block = None; + self.highest_block = 0; self.have_common_block = false; io.chain().clear_queue(); self.starting_block = io.chain().chain_info().best_block_number; @@ -294,27 +294,25 @@ impl ChainSync { for i in 0..item_count { let info: BlockHeader = try!(r.val_at(i)); let number = BlockNumber::from(info.number); - if number <= self.current_base_block() || self.headers.have_item(&number) { + if number <= self.last_imported_block || self.headers.have_item(&number) { trace!(target: "sync", "Skipping existing block header"); continue; } - - if self.highest_block == None || number > self.highest_block.unwrap() { - self.highest_block = Some(number); + if number > self.highest_block { + self.highest_block = number; } let hash = info.hash(); match io.chain().block_status(&hash) { BlockStatus::InChain => { self.have_common_block = true; - self.last_imported_block = Some(number); - self.last_imported_hash = Some(hash.clone()); + self.last_imported_block = number; + self.last_imported_hash = hash.clone(); trace!(target: "sync", "Found common header {} ({})", number, hash); }, _ => { if self.have_common_block { //validate chain - let base_hash = self.last_imported_hash.clone().unwrap(); - if self.have_common_block && number == self.current_base_block() + 1 && info.parent_hash != base_hash { + if self.have_common_block && number == self.last_imported_block + 1 && info.parent_hash != self.last_imported_hash { // TODO: lower peer rating debug!(target: "sync", "Mismatched block header {} {}", number, hash); continue; @@ -410,7 +408,7 @@ impl ChainSync { trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); let header_view = HeaderView::new(header_rlp.as_raw()); // TODO: Decompose block and add to self.headers and self.bodies instead - if header_view.number() == From::from(self.current_base_block() + 1) { + if header_view.number() == From::from(self.last_imported_block + 1) { match io.chain().import_block(block_rlp.as_raw().to_vec()) { Err(ImportError::AlreadyInChain) => { trace!(target: "sync", "New block already in chain {:?}", h); @@ -553,10 +551,6 @@ impl ChainSync { } } - fn current_base_block(&self) -> BlockNumber { - match self.last_imported_block { None => 0, Some(x) => x } - } - /// Find some headers or blocks to download for a peer. fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId) { self.clear_peer_download(peer_id); @@ -570,7 +564,7 @@ impl ChainSync { let mut needed_bodies: Vec = Vec::new(); let mut needed_numbers: Vec = Vec::new(); - if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.current_base_block() + 1 { + if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.last_imported_block + 1 { for (start, ref items) in self.headers.range_iter() { if needed_bodies.len() > MAX_BODIES_TO_REQUEST { break; @@ -603,12 +597,12 @@ impl ChainSync { } if start == 0 { self.have_common_block = true; //reached genesis - self.last_imported_hash = Some(chain_info.genesis_hash); + self.last_imported_hash = chain_info.genesis_hash; } } if self.have_common_block { let mut headers: Vec = Vec::new(); - let mut prev = self.current_base_block() + 1; + let mut prev = self.last_imported_block + 1; for (next, ref items) in self.headers.range_iter() { if !headers.is_empty() { break; @@ -663,7 +657,7 @@ impl ChainSync { { let headers = self.headers.range_iter().next().unwrap(); let bodies = self.bodies.range_iter().next().unwrap(); - if headers.0 != bodies.0 || headers.0 != self.current_base_block() + 1 { + if headers.0 != bodies.0 || headers.0 != self.last_imported_block + 1 { return; } @@ -676,21 +670,27 @@ impl ChainSync { block_rlp.append_raw(body.at(0).as_raw(), 1); block_rlp.append_raw(body.at(1).as_raw(), 1); let h = &headers.1[i].hash; + // Perform basic block verification + if !Block::is_good(block_rlp.as_raw()) { + debug!(target: "sync", "Bad block rlp {:?} : {:?}", h, block_rlp.as_raw()); + restart = true; + break; + } match io.chain().import_block(block_rlp.out()) { Err(ImportError::AlreadyInChain) => { trace!(target: "sync", "Block already in chain {:?}", h); - self.last_imported_block = Some(headers.0 + i as BlockNumber); - self.last_imported_hash = Some(h.clone()); + self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = h.clone(); }, Err(ImportError::AlreadyQueued) => { trace!(target: "sync", "Block already queued {:?}", h); - self.last_imported_block = Some(headers.0 + i as BlockNumber); - self.last_imported_hash = Some(h.clone()); + self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = h.clone(); }, Ok(_) => { trace!(target: "sync", "Block queued {:?}", h); - self.last_imported_block = Some(headers.0 + i as BlockNumber); - self.last_imported_hash = Some(h.clone()); + self.last_imported_block = headers.0 + i as BlockNumber; + self.last_imported_hash = h.clone(); imported += 1; }, Err(e) => { @@ -707,8 +707,8 @@ impl ChainSync { return; } - self.headers.remove_head(&(self.last_imported_block.unwrap() + 1)); - self.bodies.remove_head(&(self.last_imported_block.unwrap() + 1)); + self.headers.remove_head(&(self.last_imported_block + 1)); + self.bodies.remove_head(&(self.last_imported_block + 1)); if self.headers.is_empty() { assert!(self.bodies.is_empty()); diff --git a/src/sync/io.rs b/sync/src/io.rs similarity index 96% rename from src/sync/io.rs rename to sync/src/io.rs index 501d36ad2..4425a2555 100644 --- a/src/sync/io.rs +++ b/sync/src/io.rs @@ -1,7 +1,7 @@ -use client::BlockChainClient; +use ethcore::client::BlockChainClient; use util::{NetworkContext, PeerId, PacketId,}; use util::error::UtilError; -use service::SyncMessage; +use ethcore::service::SyncMessage; /// IO interface for the syning handler. /// Provides peer connection management and an interface to the blockchain client. diff --git a/src/sync/mod.rs b/sync/src/lib.rs similarity index 65% rename from src/sync/mod.rs rename to sync/src/lib.rs index 34a1e429d..09f3eb521 100644 --- a/src/sync/mod.rs +++ b/sync/src/lib.rs @@ -1,34 +1,46 @@ -/// Blockchain sync module -/// Implements ethereum protocol version 63 as specified here: -/// https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol -/// -/// Usage example: -/// -/// ```rust -/// extern crate ethcore_util as util; -/// extern crate ethcore; -/// use std::env; -/// use std::sync::Arc; -/// use util::network::{NetworkService, NetworkConfiguration}; -/// use ethcore::client::Client; -/// use ethcore::sync::EthSync; -/// use ethcore::ethereum; -/// -/// fn main() { -/// let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); -/// let dir = env::temp_dir(); -/// let client = Client::new(ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); -/// EthSync::register(&mut service, client); -/// } -/// ``` +#![warn(missing_docs)] +#![feature(plugin)] +#![plugin(clippy)] +#![feature(augmented_assignments)] +//! Blockchain sync module +//! Implements ethereum protocol version 63 as specified here: +//! https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol +//! +//! Usage example: +//! +//! ```rust +//! extern crate ethcore_util as util; +//! extern crate ethcore; +//! extern crate ethsync; +//! use std::env; +//! use std::sync::Arc; +//! use util::network::{NetworkService, NetworkConfiguration}; +//! use ethcore::client::Client; +//! use ethsync::EthSync; +//! use ethcore::ethereum; +//! +//! fn main() { +//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap(); +//! let dir = env::temp_dir(); +//! let client = Client::new(ethereum::new_frontier(), &dir, service.io().channel()).unwrap(); +//! EthSync::register(&mut service, client); +//! } +//! ``` + +#[macro_use] +extern crate log; +#[macro_use] +extern crate ethcore_util as util; +extern crate ethcore; +extern crate env_logger; use std::ops::*; use std::sync::*; -use client::Client; +use ethcore::client::Client; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; -use sync::chain::ChainSync; -use service::SyncMessage; -use sync::io::NetSyncIo; +use chain::ChainSync; +use ethcore::service::SyncMessage; +use io::NetSyncIo; mod chain; mod io; diff --git a/src/sync/range_collection.rs b/sync/src/range_collection.rs similarity index 100% rename from src/sync/range_collection.rs rename to sync/src/range_collection.rs diff --git a/sync/src/service.rs b/sync/src/service.rs new file mode 100644 index 000000000..8c900d20a --- /dev/null +++ b/sync/src/service.rs @@ -0,0 +1,104 @@ +use util::*; +use sync::*; +use spec::Spec; +use error::*; +use std::env; +use client::Client; + +/// Message type for external and internal events +#[derive(Clone)] +pub enum SyncMessage { + /// New block has been imported into the blockchain + NewChainBlock(Bytes), //TODO: use Cow + /// A block is ready + BlockVerified, +} + +/// TODO [arkpar] Please document me +pub type NetSyncMessage = NetworkIoMessage; + +/// Client service setup. Creates and registers client and network services with the IO subsystem. +pub struct ClientService { + net_service: NetworkService, + client: Arc, + sync: Arc, +} + +impl ClientService { + /// Start the service in a separate thread. + pub fn start(spec: Spec, net_config: NetworkConfiguration) -> Result { + let mut net_service = try!(NetworkService::start(net_config)); + info!("Starting {}", net_service.host_info()); + info!("Configured for {} using {} engine", spec.name, spec.engine_name); + let mut dir = env::home_dir().unwrap(); + dir.push(".parity"); + dir.push(H64::from(spec.genesis_header().hash()).hex()); + let client = try!(Client::new(spec, &dir, net_service.io().channel())); + let sync = EthSync::register(&mut net_service, client.clone()); + let client_io = Arc::new(ClientIoHandler { + client: client.clone() + }); + try!(net_service.io().register_handler(client_io)); + + Ok(ClientService { + net_service: net_service, + client: client, + sync: sync, + }) + } + + /// Get the network service. + pub fn add_node(&mut self, _enode: &str) { + unimplemented!(); + } + + /// TODO [arkpar] Please document me + pub fn io(&mut self) -> &mut IoService { + self.net_service.io() + } + + /// TODO [arkpar] Please document me + pub fn client(&self) -> Arc { + self.client.clone() + + } + + /// Get shared sync handler + pub fn sync(&self) -> Arc { + self.sync.clone() + } +} + +/// IO interface for the Client handler +struct ClientIoHandler { + client: Arc +} + +const CLIENT_TICK_TIMER: TimerToken = 0; +const CLIENT_TICK_MS: u64 = 5000; + +impl IoHandler for ClientIoHandler { + fn initialize(&self, io: &IoContext) { + io.register_timer(CLIENT_TICK_TIMER, CLIENT_TICK_MS).expect("Error registering client timer"); + } + + fn timeout(&self, _io: &IoContext, timer: TimerToken) { + if timer == CLIENT_TICK_TIMER { + self.client.tick(); + } + } + + #[allow(match_ref_pats)] + #[allow(single_match)] + fn message(&self, io: &IoContext, net_message: &NetSyncMessage) { + if let &UserMessage(ref message) = net_message { + match message { + &SyncMessage::BlockVerified => { + self.client.import_verified_blocks(&io.channel()); + }, + _ => {}, // ignore other messages + } + } + } +} + diff --git a/src/sync/tests.rs b/sync/src/tests.rs similarity index 83% rename from src/sync/tests.rs rename to sync/src/tests.rs index eb09b467a..41516ef60 100644 --- a/src/sync/tests.rs +++ b/sync/src/tests.rs @@ -1,10 +1,10 @@ use util::*; -use client::{BlockChainClient, BlockStatus, TreeRoute, BlockChainInfo}; -use block_queue::BlockQueueInfo; -use header::{Header as BlockHeader, BlockNumber}; -use error::*; -use sync::io::SyncIo; -use sync::chain::{ChainSync, SyncState}; +use ethcore::client::{BlockChainClient, BlockStatus, TreeRoute, BlockChainInfo}; +use ethcore::block_queue::BlockQueueInfo; +use ethcore::header::{Header as BlockHeader, BlockNumber}; +use ethcore::error::*; +use io::SyncIo; +use chain::ChainSync; struct TestBlockChainClient { blocks: RwLock>, @@ -38,7 +38,11 @@ impl TestBlockChainClient { header.number = n as BlockNumber; let mut uncles = RlpStream::new_list(if empty {0} else {1}); if !empty { - uncles.append(&H256::from(&U256::from(n))); + let mut uncle_header = BlockHeader::new(); + uncle_header.difficulty = From::from(n); + uncle_header.parent_hash = self.last_hash.read().unwrap().clone(); + uncle_header.number = n as BlockNumber; + uncles.append(&uncle_header); header.uncles_hash = uncles.as_raw().sha3(); } let mut rlp = RlpStream::new_list(3); @@ -241,15 +245,13 @@ struct TestPeer { } struct TestNet { - peers: Vec, - started: bool + peers: Vec } impl TestNet { pub fn new(n: usize) -> TestNet { let mut net = TestNet { peers: Vec::new(), - started: false }; for _ in 0..n { net.peers.push(TestPeer { @@ -293,28 +295,10 @@ impl TestNet { } } - pub fn restart_peer(&mut self, i: usize) { - let peer = self.peer_mut(i); - peer.sync.restart(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); - } - - pub fn sync(&mut self) -> u32 { + pub fn sync(&mut self) { self.start(); - let mut total_steps = 0; while !self.done() { - self.sync_step(); - total_steps = total_steps + 1; - } - total_steps - } - - pub fn sync_steps(&mut self, count: usize) { - if !self.started { - self.start(); - self.started = true; - } - for _ in 0..count { - self.sync_step(); + self.sync_step() } } @@ -323,8 +307,10 @@ impl TestNet { } } + #[test] -fn chain_two_peers() { +fn full_sync_two_peers() { + ::env_logger::init().ok(); let mut net = TestNet::new(3); net.peer_mut(1).chain.add_blocks(1000, false); net.peer_mut(2).chain.add_blocks(1000, false); @@ -334,26 +320,8 @@ fn chain_two_peers() { } #[test] -fn chain_status_after_sync() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); - net.sync(); - let status = net.peer(0).sync.status(); - assert_eq!(status.state, SyncState::Idle); -} - -#[test] -fn chain_takes_few_steps() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(100, false); - net.peer_mut(2).chain.add_blocks(100, false); - let total_steps = net.sync(); - assert!(total_steps < 7); -} - -#[test] -fn chain_empty_blocks() { +fn full_sync_empty_blocks() { + ::env_logger::init().ok(); let mut net = TestNet::new(3); for n in 0..200 { net.peer_mut(1).chain.add_blocks(5, n % 2 == 0); @@ -365,7 +333,8 @@ fn chain_empty_blocks() { } #[test] -fn chain_forked() { +fn forked_sync() { + ::env_logger::init().ok(); let mut net = TestNet::new(3); net.peer_mut(0).chain.add_blocks(300, false); net.peer_mut(1).chain.add_blocks(300, false); @@ -382,25 +351,3 @@ fn chain_forked() { assert_eq!(net.peer(1).chain.numbers.read().unwrap().deref(), &peer1_chain); assert_eq!(net.peer(2).chain.numbers.read().unwrap().deref(), &peer1_chain); } - -#[test] -fn chain_restart() { - let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); - - net.sync_steps(8); - - // make sure that sync has actually happened - assert!(net.peer(0).chain.chain_info().best_block_number > 100); - net.restart_peer(0); - - let status = net.peer(0).sync.status(); - assert_eq!(status.state, SyncState::NotSynced); -} - -#[test] -fn chain_status_empty() { - let net = TestNet::new(2); - assert_eq!(net.peer(0).sync.status().state, SyncState::NotSynced); -}