diff --git a/Cargo.lock b/Cargo.lock index 97cf6d7eb..94922c70e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -261,6 +261,7 @@ dependencies = [ "ethjson 0.1.0", "ethstore 0.1.0", "heapsize 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -376,6 +377,7 @@ dependencies = [ name = "ethcore-util" version = "1.2.1" dependencies = [ + "ansi_term 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", "arrayvec 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)", "bigint 0.1.0", "chrono 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)", @@ -547,7 +549,7 @@ dependencies = [ [[package]] name = "hyper" version = "0.9.4" -source = "git+https://github.com/ethcore/hyper#7ccfcb2aa7e6aa6300efa8cebd6a0e6ce55582ea" +source = "git+https://github.com/ethcore/hyper#9e346c1d4bc30cd4142dea9d8a0b117d30858ca4" dependencies = [ "cookie 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -556,7 +558,7 @@ dependencies = [ "mime 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "rotor 0.6.3 (git+https://github.com/ethcore/rotor)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", - "spmc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "spmc 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "traitobject 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "typeable 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1222,7 +1224,7 @@ dependencies = [ [[package]] name = "spmc" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index 515091a16..d6785a254 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -32,6 +32,10 @@ bloomchain = "0.1" rayon = "0.3.1" ethstore = { path = "../ethstore" } +[dependencies.hyper] +git = "https://github.com/ethcore/hyper" +default-features = false + [features] jit = ["evmjit"] evm-debug = [] diff --git a/ethcore/src/block.rs b/ethcore/src/block.rs index 103435f40..13a2024d9 100644 --- a/ethcore/src/block.rs +++ b/ethcore/src/block.rs @@ -164,6 +164,12 @@ pub trait IsBlock { fn uncles(&self) -> &Vec
{ &self.block().base.uncles } } +/// Trait for a object that has a state database. +pub trait Drain { + /// Drop this object and return the underlieing database. + fn drain(self) -> Box; +} + impl IsBlock for ExecutedBlock { fn block(&self) -> &ExecutedBlock { self } } @@ -436,9 +442,11 @@ impl LockedBlock { _ => Ok(SealedBlock { block: s.block, uncle_bytes: s.uncle_bytes }), } } +} +impl Drain for LockedBlock { /// Drop this object and return the underlieing database. - pub fn drain(self) -> Box { self.block.state.drop().1 } + fn drain(self) -> Box { self.block.state.drop().1 } } impl SealedBlock { @@ -450,9 +458,11 @@ impl SealedBlock { block_rlp.append_raw(&self.uncle_bytes, 1); block_rlp.out() } +} +impl Drain for SealedBlock { /// Drop this object and return the underlieing database. - pub fn drain(self) -> Box { self.block.state.drop().1 } + fn drain(self) -> Box { self.block.state.drop().1 } } impl IsBlock for SealedBlock { diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 5d157b654..d5e509e62 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -249,7 +249,7 @@ impl Client { Ok(locked_block) } - fn calculate_enacted_retracted(&self, import_results: Vec) -> (Vec, Vec) { + fn calculate_enacted_retracted(&self, import_results: &[ImportRoute]) -> (Vec, Vec) { fn map_to_vec(map: Vec<(H256, bool)>) -> Vec { map.into_iter().map(|(k, _v)| k).collect() } @@ -259,12 +259,12 @@ impl Client { // could be retracted in import `k+1`. This is why to understand if after all inserts // the block is enacted or retracted we iterate over all routes and at the end final state // will be in the hashmap - let map = import_results.into_iter().fold(HashMap::new(), |mut map, route| { - for hash in route.enacted { - map.insert(hash, true); + let map = import_results.iter().fold(HashMap::new(), |mut map, route| { + for hash in &route.enacted { + map.insert(hash.clone(), true); } - for hash in route.retracted { - map.insert(hash, false); + for hash in &route.retracted { + map.insert(hash.clone(), false); } map }); @@ -301,36 +301,10 @@ impl Client { invalid_blocks.insert(header.hash()); continue; } + let closed_block = closed_block.unwrap(); imported_blocks.push(header.hash()); - // Are we committing an era? - let ancient = if header.number() >= HISTORY { - let n = header.number() - HISTORY; - Some((n, self.chain.block_hash(n).unwrap())) - } else { - None - }; - - // Commit results - let closed_block = closed_block.unwrap(); - let receipts = closed_block.block().receipts().clone(); - let traces = From::from(closed_block.block().traces().clone().unwrap_or_else(Vec::new)); - - closed_block.drain() - .commit(header.number(), &header.hash(), ancient) - .expect("State DB commit failed."); - - // And update the chain after commit to prevent race conditions - // (when something is in chain but you are not able to fetch details) - let route = self.chain.insert_block(&block.bytes, receipts); - self.tracedb.import(TraceImportRequest { - traces: traces, - block_hash: header.hash(), - block_number: header.number(), - enacted: route.enacted.clone(), - retracted: route.retracted.len() - }); - + let route = self.commit_block(closed_block, &header.hash(), &block.bytes); import_results.push(route); self.report.write().unwrap().accrue_block(&block); @@ -351,7 +325,7 @@ impl Client { { if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() { - let (enacted, retracted) = self.calculate_enacted_retracted(import_results); + let (enacted, retracted) = self.calculate_enacted_retracted(&import_results); if self.queue_info().is_empty() { self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted); @@ -362,19 +336,50 @@ impl Client { invalid: invalid_blocks, enacted: enacted, retracted: retracted, + sealed: Vec::new(), })).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); } } - { - if self.chain_info().best_block_hash != original_best { - self.miner.update_sealing(self); - } + if self.chain_info().best_block_hash != original_best { + self.miner.update_sealing(self); } imported } + fn commit_block(&self, block: B, hash: &H256, block_data: &Bytes) -> ImportRoute where B: IsBlock + Drain { + let number = block.header().number(); + // Are we committing an era? + let ancient = if number >= HISTORY { + let n = number - HISTORY; + Some((n, self.chain.block_hash(n).unwrap())) + } else { + None + }; + + // Commit results + let receipts = block.receipts().clone(); + let traces = From::from(block.traces().clone().unwrap_or_else(Vec::new)); + + // CHECK! I *think* this is fine, even if the state_root is equal to another + // already-imported block of the same number. + // TODO: Prove it with a test. + block.drain().commit(number, hash, ancient).expect("State DB commit failed."); + + // And update the chain after commit to prevent race conditions + // (when something is in chain but you are not able to fetch details) + let route = self.chain.insert_block(block_data, receipts); + self.tracedb.import(TraceImportRequest { + traces: traces, + block_hash: hash.clone(), + block_number: number, + enacted: route.enacted.clone(), + retracted: route.retracted.len() + }); + route + } + /// Import transactions from the IO queue pub fn import_queued_transactions(&self, transactions: &[Bytes]) -> usize { let _timer = PerfTimer::new("import_queued_transactions"); @@ -830,6 +835,39 @@ impl MiningBlockChainClient for Client { fn vm_factory(&self) -> &EvmFactory { &self.vm_factory } + + fn import_sealed_block(&self, block: SealedBlock) -> ImportResult { + let _import_lock = self.import_lock.lock(); + let _timer = PerfTimer::new("import_sealed_block"); + + let original_best = self.chain_info().best_block_hash; + + let h = block.header().hash(); + let number = block.header().number(); + + let block_data = block.rlp_bytes(); + let route = self.commit_block(block, &h, &block_data); + trace!(target: "client", "Imported sealed block #{} ({})", number, h); + + { + let (enacted, retracted) = self.calculate_enacted_retracted(&[route]); + self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted); + + self.io_channel.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { + imported: vec![h.clone()], + invalid: vec![], + enacted: enacted, + retracted: retracted, + sealed: vec![h.clone()], + })).unwrap_or_else(|e| warn!("Error sending IO notification: {:?}", e)); + } + + if self.chain_info().best_block_hash != original_best { + self.miner.update_sealing(self); + } + + Ok(h) + } } impl MayPanic for Client { diff --git a/ethcore/src/client/mod.rs b/ethcore/src/client/mod.rs index bef814b4e..7f3c3bb3a 100644 --- a/ethcore/src/client/mod.rs +++ b/ethcore/src/client/mod.rs @@ -37,7 +37,7 @@ use util::numbers::U256; use util::Itertools; use blockchain::TreeRoute; use block_queue::BlockQueueInfo; -use block::OpenBlock; +use block::{OpenBlock, SealedBlock}; use header::{BlockNumber, Header}; use transaction::{LocalizedTransaction, SignedTransaction}; use log_entry::LocalizedLogEntry; @@ -253,4 +253,7 @@ pub trait MiningBlockChainClient : BlockChainClient { /// Returns EvmFactory. fn vm_factory(&self) -> &EvmFactory; + + /// Import sealed block. Skips all verifications. + fn import_sealed_block(&self, block: SealedBlock) -> ImportResult; } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index ed1f10e09..f51f978de 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -32,7 +32,7 @@ use miner::{Miner, MinerService}; use spec::Spec; use block_queue::BlockQueueInfo; -use block::OpenBlock; +use block::{OpenBlock, SealedBlock}; use executive::Executed; use error::{ExecutionError}; use trace::LocalizedTrace; @@ -248,6 +248,10 @@ impl MiningBlockChainClient for TestBlockChainClient { fn vm_factory(&self) -> &EvmFactory { unimplemented!(); } + + fn import_sealed_block(&self, _block: SealedBlock) -> ImportResult { + unimplemented!(); + } } impl BlockChainClient for TestBlockChainClient { diff --git a/ethcore/src/ethereum/ethash.rs b/ethcore/src/ethereum/ethash.rs index 3400220db..84c2a9608 100644 --- a/ethcore/src/ethereum/ethash.rs +++ b/ethcore/src/ethereum/ethash.rs @@ -14,9 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -extern crate ethash; - -use self::ethash::{quick_get_difficulty, EthashManager, H256 as EH256}; +use ethash::{quick_get_difficulty, EthashManager, H256 as EH256}; use common::*; use block::*; use spec::CommonParams; diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index 9919ec62a..54a944331 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -91,6 +91,8 @@ extern crate ethjson; extern crate bloomchain; #[macro_use] extern crate ethcore_ipc as ipc; extern crate rayon; +extern crate hyper; +extern crate ethash; pub extern crate ethstore; #[cfg(test)] extern crate ethcore_devtools as devtools; diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 4518e416a..7b101afc7 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -19,6 +19,7 @@ use std::sync::atomic::AtomicBool; use std::time::{Instant, Duration}; use util::*; +use util::Colour::White; use account_provider::AccountProvider; use views::{BlockView, HeaderView}; use client::{MiningBlockChainClient, Executive, Executed, EnvInfo, TransactOptions, BlockID, CallAnalytics}; @@ -29,6 +30,7 @@ use receipt::{Receipt}; use spec::Spec; use engine::Engine; use miner::{MinerService, MinerStatus, TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin}; +use miner::work_notify::WorkPoster; /// Different possible definitions for pending transaction set. #[derive(Debug)] @@ -45,6 +47,8 @@ pub enum PendingSet { /// Configures the behaviour of the miner. #[derive(Debug)] pub struct MinerOptions { + /// URLs to notify when there is new work. + pub new_work_notify: Vec, /// Force the miner to reseal, even when nobody has asked for work. pub force_sealing: bool, /// Reseal on receipt of new external transactions. @@ -61,11 +65,14 @@ pub struct MinerOptions { pub pending_set: PendingSet, /// How many historical work packages can we store before running out? pub work_queue_size: usize, + /// Can we submit two different solutions for the same block and expect both to result in an import? + pub enable_resubmission: bool, } impl Default for MinerOptions { fn default() -> Self { MinerOptions { + new_work_notify: vec![], force_sealing: false, reseal_on_external_tx: true, reseal_on_own_tx: true, @@ -74,6 +81,7 @@ impl Default for MinerOptions { pending_set: PendingSet::AlwaysQueue, reseal_min_period: Duration::from_secs(0), work_queue_size: 20, + enable_resubmission: true, } } } @@ -95,6 +103,7 @@ pub struct Miner { spec: Spec, accounts: Option>, + work_poster: Option, } impl Miner { @@ -112,23 +121,26 @@ impl Miner { extra_data: RwLock::new(Vec::new()), accounts: None, spec: spec, + work_poster: None, } } /// Creates new instance of miner pub fn new(options: MinerOptions, spec: Spec, accounts: Option>) -> Arc { + let work_poster = if !options.new_work_notify.is_empty() { Some(WorkPoster::new(&options.new_work_notify)) } else { None }; Arc::new(Miner { transaction_queue: Mutex::new(TransactionQueue::with_limits(options.tx_queue_size, options.tx_gas_limit)), - sealing_enabled: AtomicBool::new(options.force_sealing), + sealing_enabled: AtomicBool::new(options.force_sealing || !options.new_work_notify.is_empty()), next_allowed_reseal: Mutex::new(Instant::now()), sealing_block_last_request: Mutex::new(0), sealing_work: Mutex::new(UsingQueue::new(options.work_queue_size)), gas_range_target: RwLock::new((U256::zero(), U256::zero())), author: RwLock::new(Address::default()), extra_data: RwLock::new(Vec::new()), - accounts: accounts, options: options, + accounts: accounts, spec: spec, + work_poster: work_poster, }) } @@ -136,15 +148,20 @@ impl Miner { self.spec.engine.deref() } + fn forced_sealing(&self) -> bool { + self.options.force_sealing || !self.options.new_work_notify.is_empty() + } + /// Prepares new block for sealing including top transactions from queue. #[cfg_attr(feature="dev", allow(match_same_arms))] #[cfg_attr(feature="dev", allow(cyclomatic_complexity))] fn prepare_sealing(&self, chain: &MiningBlockChainClient) { trace!(target: "miner", "prepare_sealing: entering"); - let (transactions, mut open_block) = { + let (transactions, mut open_block, last_work_hash) = { let transactions = {self.transaction_queue.lock().unwrap().top_transactions()}; let mut sealing_work = self.sealing_work.lock().unwrap(); + let last_work_hash = sealing_work.peek_last_ref().map(|pb| pb.block().fields().header.hash()); let best_hash = chain.best_block_header().sha3(); /* // check to see if last ClosedBlock in would_seals is actually same parent block. @@ -171,7 +188,7 @@ impl Miner { ) } }; - (transactions, open_block) + (transactions, open_block, last_work_hash) }; let mut invalid_transactions = HashSet::new(); @@ -237,13 +254,23 @@ impl Miner { } } - let mut sealing_work = self.sealing_work.lock().unwrap(); - if sealing_work.peek_last_ref().map_or(true, |pb| pb.block().fields().header.hash() != block.block().fields().header.hash()) { - trace!(target: "miner", "Pushing a new, refreshed or borrowed pending {}...", block.block().fields().header.hash()); - sealing_work.push(block); - } - - trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.peek_last_ref().map(|b| b.block().fields().header.hash())); + let work = { + let mut sealing_work = self.sealing_work.lock().unwrap(); + trace!(target: "miner", "Checking whether we need to reseal: last={:?}, this={:?}", last_work_hash, block.block().fields().header.hash()); + let work = if last_work_hash.map_or(true, |h| h != block.block().fields().header.hash()) { + trace!(target: "miner", "Pushing a new, refreshed or borrowed pending {}...", block.block().fields().header.hash()); + let pow_hash = block.block().fields().header.hash(); + let number = block.block().fields().header.number(); + let difficulty = *block.block().fields().header.difficulty(); + sealing_work.push(block); + Some((pow_hash, difficulty, number)) + } else { + None + }; + trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.peek_last_ref().map(|b| b.block().fields().header.hash())); + work + }; + work.map(|(pow_hash, difficulty, number)| self.work_poster.as_ref().map(|ref p| p.notify(pow_hash, difficulty, number))); } fn update_gas_limit(&self, chain: &MiningBlockChainClient) { @@ -562,7 +589,7 @@ impl MinerService for Miner { let current_no = chain.chain_info().best_block_number; let has_local_transactions = self.transaction_queue.lock().unwrap().has_local_pending_transactions(); let last_request = *self.sealing_block_last_request.lock().unwrap(); - let should_disable_sealing = !self.options.force_sealing + let should_disable_sealing = !self.forced_sealing() && !has_local_transactions && current_no > last_request && current_no - last_request > SEALING_TIMEOUT_IN_BLOCKS; @@ -589,26 +616,22 @@ impl MinerService for Miner { } fn submit_seal(&self, chain: &MiningBlockChainClient, pow_hash: H256, seal: Vec) -> Result<(), Error> { - if let Some(b) = self.sealing_work.lock().unwrap().take_used_if(|b| &b.hash() == &pow_hash) { - match b.lock().try_seal(self.engine(), seal) { - Err(_) => { - info!(target: "miner", "Mined block rejected, PoW was invalid."); - Err(Error::PowInvalid) - } - Ok(sealed) => { - info!(target: "miner", "New block mined, hash: {}", sealed.header().hash()); - // TODO: commit DB from `sealed.drain` and make a VerifiedBlock to skip running the transactions twice. - let b = sealed.rlp_bytes(); - let h = b.sha3(); - try!(chain.import_block(b)); - info!("Block {} submitted and imported.", h); - Ok(()) - } - } + let result = if let Some(b) = self.sealing_work.lock().unwrap().get_used_if(if self.options.enable_resubmission { GetAction::Clone } else { GetAction::Take }, |b| &b.hash() == &pow_hash) { + b.lock().try_seal(self.engine(), seal).or_else(|_| { + warn!(target: "miner", "Mined solution rejected: Invalid."); + Err(Error::PowInvalid) + }) } else { - info!(target: "miner", "Mined block rejected, PoW hash invalid or out of date."); + warn!(target: "miner", "Mined solution rejected: Block unknown or out of date."); Err(Error::PowHashInvalid) - } + }; + result.and_then(|sealed| { + let n = sealed.header().number(); + let h = sealed.header().hash(); + try!(chain.import_sealed_block(sealed)); + info!(target: "miner", "Mined block imported OK. #{}: {}", paint(White.bold(), format!("{}", n)), paint(White.bold(), h.hex())); + Ok(()) + }) } fn chain_new_blocks(&self, chain: &MiningBlockChainClient, _imported: &[H256], _invalid: &[H256], enacted: &[H256], retracted: &[H256]) { diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index e65d6048a..152bd1a61 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -45,6 +45,7 @@ mod miner; mod external; mod transaction_queue; +mod work_notify; pub use self::transaction_queue::{TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin}; pub use self::miner::{Miner, MinerOptions, PendingSet}; diff --git a/ethcore/src/miner/work_notify.rs b/ethcore/src/miner/work_notify.rs new file mode 100644 index 000000000..a153be79f --- /dev/null +++ b/ethcore/src/miner/work_notify.rs @@ -0,0 +1,115 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +extern crate hyper; + +use hyper::header::ContentType; +use hyper::method::Method; +use hyper::client::{Request, Response, Client}; +use hyper::{Next}; +use hyper::net::HttpStream; +use ethash::SeedHashCompute; +use hyper::Url; +use util::*; +use ethereum::ethash::Ethash; + +pub struct WorkPoster { + urls: Vec, + client: Mutex>, + seed_compute: Mutex, +} + +impl WorkPoster { + pub fn new(urls: &[String]) -> Self { + let urls = urls.into_iter().filter_map(|u| { + match Url::parse(&u) { + Ok(url) => Some(url), + Err(e) => { + warn!("Error parsing URL {} : {}", u, e); + None + } + } + }).collect(); + let client = WorkPoster::create_client(); + WorkPoster { + client: Mutex::new(client), + urls: urls, + seed_compute: Mutex::new(SeedHashCompute::new()), + } + } + + fn create_client() -> Client { + let client = Client::::configure() + .keep_alive(true) + .build().expect("Error creating HTTP client") as Client; + client + } + + pub fn notify(&self, pow_hash: H256, difficulty: U256, number: u64) { + // TODO: move this to engine + let target = Ethash::difficulty_to_boundary(&difficulty); + let seed_hash = &self.seed_compute.lock().unwrap().get_seedhash(number); + let seed_hash = H256::from_slice(&seed_hash[..]); + let body = format!(r#"{{ "result": ["0x{}","0x{}","0x{}","0x{:x}"] }}"#, + pow_hash.hex(), seed_hash.hex(), target.hex(), number); + let mut client = self.client.lock().unwrap(); + for u in &self.urls { + if let Err(e) = client.request(u.clone(), PostHandler { body: body.clone() }) { + warn!("Error sending HTTP notification to {} : {}, retrying", u, e); + // TODO: remove this once https://github.com/hyperium/hyper/issues/848 is fixed + *client = WorkPoster::create_client(); + if let Err(e) = client.request(u.clone(), PostHandler { body: body.clone() }) { + warn!("Error sending HTTP notification to {} : {}", u, e); + } + } + } + } +} + +struct PostHandler { + body: String, +} + +impl hyper::client::Handler for PostHandler { + fn on_request(&mut self, request: &mut Request) -> Next { + request.set_method(Method::Post); + request.headers_mut().set(ContentType::json()); + Next::write() + } + + fn on_request_writable(&mut self, encoder: &mut hyper::Encoder) -> Next { + if let Err(e) = encoder.write_all(self.body.as_bytes()) { + trace!("Error posting work data: {}", e); + } + encoder.close(); + Next::read() + + } + + fn on_response(&mut self, _response: Response) -> Next { + Next::end() + } + + fn on_response_readable(&mut self, _decoder: &mut hyper::Decoder) -> Next { + Next::end() + } + + fn on_error(&mut self, err: hyper::Error) -> Next { + trace!("Error posting work data: {}", err); + Next::end() + } +} + diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 98fb3ad23..c4cbc497b 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -17,6 +17,7 @@ //! Creates and registers client and network services. use util::*; +use util::Colour::{Yellow, White}; use util::panics::*; use spec::Spec; use error::*; @@ -36,6 +37,8 @@ pub enum SyncMessage { retracted: Vec, /// Hashes of blocks that are now included in cannonical chain enacted: Vec, + /// Hashes of blocks that are sealed by this node + sealed: Vec, }, /// Best Block Hash in chain has been changed NewChainHead, @@ -69,8 +72,7 @@ impl ClientService { try!(net_service.start()); } - info!("Starting {}", net_service.host_info()); - info!("Configured for {} using {:?} engine", spec.name, spec.engine.name()); + info!("Configured for {} using {} engine", paint(White.bold(), spec.name.clone()), paint(Yellow.bold(), spec.engine.name().to_owned())); let client = try!(Client::new(config, spec, db_path, miner, net_service.io().channel())); panic_handler.forward_from(client.deref()); let client_io = Arc::new(ClientIoHandler { diff --git a/ethcore/src/tests/helpers.rs b/ethcore/src/tests/helpers.rs index 15b346919..70a644896 100644 --- a/ethcore/src/tests/helpers.rs +++ b/ethcore/src/tests/helpers.rs @@ -17,7 +17,7 @@ use client::{BlockChainClient, Client, ClientConfig}; use common::*; use spec::*; -use block::{OpenBlock}; +use block::{OpenBlock, Drain}; use blockchain::{BlockChain, Config as BlockChainConfig}; use state::*; use evm::Schedule; diff --git a/parity/cli.rs b/parity/cli.rs index a48ce3aa7..7ebbcb0aa 100644 --- a/parity/cli.rs +++ b/parity/cli.rs @@ -169,6 +169,12 @@ Sealing/Mining Options: more than 32 characters. --tx-queue-size LIMIT Maximum amount of transactions in the queue (waiting to be included in next block) [default: 1024]. + --remove-solved Move solved blocks from the work package queue + instead of cloning them. This gives a slightly + faster import speed, but means that extra solutions + submitted for the same work package will go unused. + --notify-work URLS URLs to which work package notifications are pushed. + URLS should be a comma-delimited list of HTTP URLs. Footprint Options: --tracing BOOL Indicates if full transaction tracing should be @@ -311,6 +317,7 @@ pub struct Args { pub flag_reseal_on_txs: String, pub flag_reseal_min_period: u64, pub flag_work_queue_size: usize, + pub flag_remove_solved: bool, pub flag_tx_gas_limit: Option, pub flag_relay_set: String, pub flag_author: Option, @@ -320,6 +327,7 @@ pub struct Args { pub flag_gas_cap: String, pub flag_extra_data: Option, pub flag_tx_queue_size: usize, + pub flag_notify_work: Option, pub flag_logging: Option, pub flag_version: bool, pub flag_from: String, diff --git a/parity/configuration.rs b/parity/configuration.rs index e95ef4233..fb31bf7ad 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -25,6 +25,7 @@ use docopt::Docopt; use die::*; use util::*; +use util::log::Colour::*; use ethcore::account_provider::AccountProvider; use util::network_settings::NetworkSettings; use ethcore::client::{append_path, get_db_path, ClientConfig, DatabaseCompactionProfile, Switch, VMType}; @@ -84,6 +85,10 @@ impl Configuration { ) } + fn work_notify(&self) -> Vec { + self.args.flag_notify_work.as_ref().map_or_else(Vec::new, |s| s.split(',').map(|s| s.to_owned()).collect()) + } + pub fn miner_options(&self) -> MinerOptions { let (own, ext) = match self.args.flag_reseal_on_txs.as_str() { "none" => (false, false), @@ -93,6 +98,7 @@ impl Configuration { x => die!("{}: Invalid value for --reseal option. Use --help for more information.", x) }; MinerOptions { + new_work_notify: self.work_notify(), force_sealing: self.args.flag_force_sealing, reseal_on_external_tx: ext, reseal_on_own_tx: own, @@ -106,6 +112,7 @@ impl Configuration { }, reseal_min_period: Duration::from_millis(self.args.flag_reseal_min_period), work_queue_size: self.args.flag_work_queue_size, + enable_resubmission: !self.args.flag_remove_solved, } } @@ -175,7 +182,7 @@ impl Configuration { let wei_per_usd: f32 = 1.0e18 / usd_per_eth; let gas_per_tx: f32 = 21000.0; let wei_per_gas: f32 = wei_per_usd * usd_per_tx / gas_per_tx; - info!("Using a conversion rate of Ξ1 = US${} ({} wei/gas)", usd_per_eth, wei_per_gas); + info!("Using a conversion rate of Ξ1 = {} ({} wei/gas)", paint(White.bold(), format!("US${}", usd_per_eth)), paint(Yellow.bold(), format!("{}", wei_per_gas))); U256::from_dec_str(&format!("{:.0}", wei_per_gas)).unwrap() } } diff --git a/parity/main.rs b/parity/main.rs index 047338bc8..d466987ef 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -80,7 +80,7 @@ use std::thread::sleep; use std::time::Duration; use rustc_serialize::hex::FromHex; use ctrlc::CtrlC; -use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes, UtilError}; +use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes, UtilError, paint, Colour, version}; use util::panics::{MayPanic, ForwardPanic, PanicHandler}; use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path}; use ethcore::error::{Error, ImportError}; @@ -184,10 +184,12 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) let panic_handler = PanicHandler::new_in_arc(); // Setup logging - let logger = setup_log::setup_log(&conf.args.flag_logging); + let logger = setup_log::setup_log(&conf.args.flag_logging, conf.have_color()); // Raise fdlimit unsafe { ::fdlimit::raise_fd_limit(); } + info!("Starting {}", paint(Colour::White.bold(), format!("{}", version()))); + let net_settings = conf.net_settings(&spec); let sync_config = conf.sync_config(&spec); @@ -320,6 +322,8 @@ fn execute_export(conf: Configuration) { // Setup panic handler let panic_handler = PanicHandler::new_in_arc(); + // Setup logging + let _logger = setup_log::setup_log(&conf.args.flag_logging, conf.have_color()); // Raise fdlimit unsafe { ::fdlimit::raise_fd_limit(); } @@ -392,6 +396,8 @@ fn execute_import(conf: Configuration) { // Setup panic handler let panic_handler = PanicHandler::new_in_arc(); + // Setup logging + let _logger = setup_log::setup_log(&conf.args.flag_logging, conf.have_color()); // Raise fdlimit unsafe { ::fdlimit::raise_fd_limit(); } diff --git a/parity/setup_log.rs b/parity/setup_log.rs index 4ed153fc2..d347a6bf0 100644 --- a/parity/setup_log.rs +++ b/parity/setup_log.rs @@ -19,10 +19,10 @@ use std::env; use std::sync::Arc; use time; use env_logger::LogBuilder; -use util::{RotatingLogger}; +use util::RotatingLogger; /// Sets up the logger -pub fn setup_log(init: &Option) -> Arc { +pub fn setup_log(init: &Option, enable_color: bool) -> Arc { use rlog::*; let mut levels = String::new(); @@ -43,7 +43,7 @@ pub fn setup_log(init: &Option) -> Arc { builder.parse(s); } - let logs = Arc::new(RotatingLogger::new(levels)); + let logs = Arc::new(RotatingLogger::new(levels, enable_color)); let logger = logs.clone(); let format = move |record: &LogRecord| { let timestamp = time::strftime("%Y-%m-%d %H:%M:%S %Z", &time::now()).unwrap(); diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 05dc89564..8bfc661e3 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -498,7 +498,7 @@ impl Eth for EthClient where let pow_hash = b.hash(); let target = Ethash::difficulty_to_boundary(b.block().header().difficulty()); let seed_hash = &self.seed_compute.lock().unwrap().get_seedhash(b.block().header().number()); - to_value(&(pow_hash, H256::from_slice(&seed_hash[..]), target)) + to_value(&(pow_hash, H256::from_slice(&seed_hash[..]), target, &U256::from(b.block().header().number()))) }).unwrap_or(Err(Error::internal_error())) // no work found. }, _ => Err(Error::invalid_params()) diff --git a/rpc/src/v1/tests/eth.rs b/rpc/src/v1/tests/eth.rs index 59d06e84a..2965a62d2 100644 --- a/rpc/src/v1/tests/eth.rs +++ b/rpc/src/v1/tests/eth.rs @@ -52,6 +52,7 @@ fn sync_provider() -> Arc { fn miner_service(spec: Spec, accounts: Arc) -> Arc { Miner::new( MinerOptions { + new_work_notify: vec![], force_sealing: true, reseal_on_external_tx: true, reseal_on_own_tx: true, @@ -60,6 +61,7 @@ fn miner_service(spec: Spec, accounts: Arc) -> Arc { pending_set: PendingSet::SealingOrElseQueue, reseal_min_period: Duration::from_secs(0), work_queue_size: 50, + enable_resubmission: true, }, spec, Some(accounts) diff --git a/rpc/src/v1/tests/mocked/ethcore.rs b/rpc/src/v1/tests/mocked/ethcore.rs index 5b88e8756..cbdddc2b0 100644 --- a/rpc/src/v1/tests/mocked/ethcore.rs +++ b/rpc/src/v1/tests/mocked/ethcore.rs @@ -32,7 +32,7 @@ fn client_service() -> Arc { } fn logger() -> Arc { - Arc::new(RotatingLogger::new("rpc=trace".to_owned())) + Arc::new(RotatingLogger::new("rpc=trace".to_owned(), false)) } fn settings() -> Arc { diff --git a/signer/src/authcode_store.rs b/signer/src/authcode_store.rs index 92e86a73e..e85633d2c 100644 --- a/signer/src/authcode_store.rs +++ b/signer/src/authcode_store.rs @@ -120,7 +120,7 @@ impl AuthCodes { .filter_map(|f| String::from_utf8(f.to_vec()).ok()) .collect::>() .join("-"); - info!(target: "signer", "New authentication token generated."); + trace!(target: "signer", "New authentication token generated."); self.codes.push(code); Ok(readable_code) } diff --git a/sync/src/chain.rs b/sync/src/chain.rs index aa3657419..1897b02f4 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1231,6 +1231,14 @@ impl ChainSync { rlp_stream.out() } + /// creates latest block rlp for the given client + fn create_new_block_rlp(chain: &BlockChainClient, hash: &H256) -> Bytes { + let mut rlp_stream = RlpStream::new_list(2); + rlp_stream.append_raw(&chain.block(BlockID::Hash(hash.clone())).expect("Block has just been sealed; qed"), 1); + rlp_stream.append(&chain.block_total_difficulty(BlockID::Hash(hash.clone())).expect("Block has just been sealed; qed.")); + rlp_stream.out() + } + /// returns peer ids that have less blocks than our chain fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec<(PeerId, BlockNumber)> { let latest_hash = chain_info.best_block_hash; @@ -1250,7 +1258,6 @@ impl ChainSync { .collect::>() } - fn select_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> Vec<(PeerId, BlockNumber)> { use rand::Rng; let mut lagging_peers = self.get_lagging_peers(chain_info, io); @@ -1263,13 +1270,24 @@ impl ChainSync { } /// propagates latest block to lagging peers - fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { - let lucky_peers = self.select_lagging_peers(chain_info, io); + fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo, sealed: &[H256]) -> usize { + let lucky_peers: Vec<_> = if sealed.is_empty() { + self.select_lagging_peers(chain_info, io).iter().map(|&(id, _)| id).collect() + } else { + self.peers.keys().cloned().collect() + }; trace!(target: "sync", "Sending NewBlocks to {:?}", lucky_peers); let mut sent = 0; - for (peer_id, _) in lucky_peers { - let rlp = ChainSync::create_latest_block_rlp(io.chain()); - self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); + for peer_id in lucky_peers { + if sealed.is_empty() { + let rlp = ChainSync::create_latest_block_rlp(io.chain()); + self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); + } else { + for h in sealed { + let rlp = ChainSync::create_new_block_rlp(io.chain(), h); + self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); + } + } self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone(); self.peers.get_mut(&peer_id).unwrap().latest_number = Some(chain_info.best_block_number); sent += 1; @@ -1346,11 +1364,11 @@ impl ChainSync { sent } - fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { + fn propagate_latest_blocks(&mut self, io: &mut SyncIo, sealed: &[H256]) { let chain_info = io.chain().chain_info(); if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { let hashes = self.propagate_new_hashes(&chain_info, io); - let blocks = self.propagate_blocks(&chain_info, io); + let blocks = self.propagate_blocks(&chain_info, io, sealed); if blocks != 0 || hashes != 0 { trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); } @@ -1365,10 +1383,10 @@ impl ChainSync { } /// called when block is imported to chain, updates transactions queue and propagates the blocks - pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) { + pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256]) { if io.is_chain_queue_empty() { // Propagate latests blocks - self.propagate_latest_blocks(io); + self.propagate_latest_blocks(io, sealed); } if !invalid.is_empty() { trace!(target: "sync", "Bad blocks in the queue, restarting"); @@ -1637,7 +1655,26 @@ mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagate_blocks(&chain_info, &mut io); + let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[]); + + // 1 message should be send + assert_eq!(1, io.queue.len()); + // 1 peer should be updated + assert_eq!(1, peer_count); + // NEW_BLOCK_PACKET + assert_eq!(0x07, io.queue[0].packet_id); + } + + #[test] + fn sends_sealed_block() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(100, EachBlockWith::Uncle); + let mut queue = VecDeque::new(); + let hash = client.block_hash(BlockID::Number(99)).unwrap(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5), &client); + let chain_info = client.chain_info(); + let mut io = TestIo::new(&mut client, &mut queue, None); + let peer_count = sync.propagate_blocks(&chain_info, &mut io, &[hash.clone()]); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1761,7 +1798,7 @@ mod tests { let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagate_blocks(&chain_info, &mut io); + sync.propagate_blocks(&chain_info, &mut io, &[]); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(data)); @@ -1794,7 +1831,7 @@ mod tests { let mut queue = VecDeque::new(); let mut io = TestIo::new(&mut client, &mut queue, None); io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks); - sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks); + sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[]); assert_eq!(io.chain.miner.status().transactions_in_future_queue, 0); assert_eq!(io.chain.miner.status().transactions_in_pending_queue, 1); } @@ -1808,7 +1845,7 @@ mod tests { let mut queue = VecDeque::new(); let mut io = TestIo::new(&mut client, &mut queue, None); io.chain.miner.chain_new_blocks(io.chain, &[], &[], &good_blocks, &retracted_blocks); - sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks); + sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[]); } // then @@ -1833,10 +1870,10 @@ mod tests { let mut io = TestIo::new(&mut client, &mut queue, None); // when - sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks); + sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[]); assert_eq!(io.chain.miner.status().transactions_in_future_queue, 0); assert_eq!(io.chain.miner.status().transactions_in_pending_queue, 0); - sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks); + sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks, &[]); // then let status = io.chain.miner.status(); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 9bd10cb95..fa26e7d85 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -196,9 +196,9 @@ impl NetworkProtocolHandler for EthSync { #[cfg_attr(feature="dev", allow(single_match))] fn message(&self, io: &NetworkContext, message: &SyncMessage) { match *message { - SyncMessage::NewChainBlocks { ref imported, ref invalid, ref enacted, ref retracted } => { + SyncMessage::NewChainBlocks { ref imported, ref invalid, ref enacted, ref retracted, ref sealed } => { let mut sync_io = NetSyncIo::new(io, self.chain.deref()); - self.sync.write().unwrap().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted); + self.sync.write().unwrap().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted, sealed); }, _ => {/* Ignore other messages */}, } diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 831976048..9a9afca49 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -173,6 +173,6 @@ impl TestNet { pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { let mut peer = self.peer_mut(peer_id); - peer.sync.write().unwrap().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]); + peer.sync.write().unwrap().chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[], &[]); } } diff --git a/util/Cargo.toml b/util/Cargo.toml index 2c7dc8d68..b50ecbab5 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -37,6 +37,7 @@ vergen = "0.1" target_info = "0.1" bigint = { path = "bigint" } chrono = "0.2" +ansi_term = "0.7" [features] default = [] diff --git a/util/src/kvdb.rs b/util/src/kvdb.rs index aeb8b9fa7..8a19e48bf 100644 --- a/util/src/kvdb.rs +++ b/util/src/kvdb.rs @@ -99,7 +99,7 @@ impl DatabaseConfig { DatabaseConfig { cache_size: Some(cache_size), prefix_size: None, - max_open_files: -1, + max_open_files: 256, compaction: CompactionProfile::default(), } } @@ -122,7 +122,7 @@ impl Default for DatabaseConfig { DatabaseConfig { cache_size: None, prefix_size: None, - max_open_files: -1, + max_open_files: 256, compaction: CompactionProfile::default(), } } diff --git a/util/src/lib.rs b/util/src/lib.rs index adaf08e77..31e072dc7 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -117,6 +117,7 @@ extern crate libc; extern crate target_info; extern crate bigint; extern crate chrono; +extern crate ansi_term; pub mod standard; #[macro_use] diff --git a/util/src/log.rs b/util/src/log.rs index 172957c13..1dddae1cb 100644 --- a/util/src/log.rs +++ b/util/src/log.rs @@ -20,7 +20,21 @@ use std::env; use rlog::{LogLevelFilter}; use env_logger::LogBuilder; use std::sync::{RwLock, RwLockReadGuard}; +use std::sync::atomic::{Ordering, AtomicBool}; use arrayvec::ArrayVec; +pub use ansi_term::{Colour, Style}; + +lazy_static! { + static ref USE_COLOR: AtomicBool = AtomicBool::new(false); +} + +/// Paint, using colour if desired. +pub fn paint(c: Style, t: String) -> String { + match USE_COLOR.load(Ordering::Relaxed) { + true => format!("{}", c.paint(t)), + false => t, + } +} lazy_static! { static ref LOG_DUMMY: bool = { @@ -57,7 +71,8 @@ impl RotatingLogger { /// Creates new `RotatingLogger` with given levels. /// It does not enforce levels - it's just read only. - pub fn new(levels: String) -> Self { + pub fn new(levels: String, enable_color: bool) -> Self { + USE_COLOR.store(enable_color, Ordering::Relaxed); RotatingLogger { levels: levels, logs: RwLock::new(ArrayVec::<[_; LOG_SIZE]>::new()), @@ -86,7 +101,7 @@ mod test { use super::RotatingLogger; fn logger() -> RotatingLogger { - RotatingLogger::new("test".to_owned()) + RotatingLogger::new("test".to_owned(), false) } #[test] diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 46185482f..a48d1544c 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -32,6 +32,8 @@ use misc::version; use crypto::*; use sha3::Hashable; use rlp::*; +use log::Colour::White; +use log::paint; use network::session::{Session, SessionData}; use error::*; use io::*; @@ -343,6 +345,7 @@ pub struct Host where Message: Send + Sync + Clone { reserved_nodes: RwLock>, num_sessions: AtomicUsize, stopping: AtomicBool, + first_time: AtomicBool, } impl Host where Message: Send + Sync + Clone { @@ -398,6 +401,7 @@ impl Host where Message: Send + Sync + Clone { reserved_nodes: RwLock::new(HashSet::new()), num_sessions: AtomicUsize::new(0), stopping: AtomicBool::new(false), + first_time: AtomicBool::new(true), }; for n in boot_nodes { @@ -533,7 +537,11 @@ impl Host where Message: Send + Sync + Clone { }; self.info.write().unwrap().public_endpoint = Some(public_endpoint.clone()); - info!("Public node URL: {}", self.external_url().unwrap()); + + if self.first_time.load(AtomicOrdering::Relaxed) { + info!("Public node URL: {}", paint(White.bold(), format!("{}", self.external_url().unwrap()))); + self.first_time.store(false, AtomicOrdering::Relaxed); + } // Initialize discovery. let discovery = { diff --git a/util/src/network/tests.rs b/util/src/network/tests.rs index 861edc144..cd3f48d9a 100644 --- a/util/src/network/tests.rs +++ b/util/src/network/tests.rs @@ -88,7 +88,7 @@ impl NetworkProtocolHandler for TestProtocol { /// Timer function called after a timeout created with `NetworkContext::timeout`. fn timeout(&self, io: &NetworkContext, timer: TimerToken) { - io.message(TestProtocolMessage { payload: 22 }); + io.message(TestProtocolMessage { payload: 22 }).unwrap(); assert_eq!(timer, 0); self.got_timeout.store(true, AtomicOrdering::Relaxed); } diff --git a/util/src/using_queue.rs b/util/src/using_queue.rs index a5a2b0465..e5e1a5a58 100644 --- a/util/src/using_queue.rs +++ b/util/src/using_queue.rs @@ -27,6 +27,14 @@ pub struct UsingQueue where T: Clone { max_size: usize, } +/// Take an item or just clone it? +pub enum GetAction { + /// Remove the item, faster but you can't get it back. + Take, + /// Clone the item, slower but you can get it again. + Clone, +} + impl UsingQueue where T: Clone { /// Create a new struct with a maximum size of `max_size`. pub fn new(max_size: usize) -> UsingQueue { @@ -74,6 +82,20 @@ impl UsingQueue where T: Clone { self.in_use.iter().position(|r| predicate(r)).map(|i| self.in_use.remove(i)) } + /// Returns `Some` item which is the first that `f` returns `true` with a reference to it + /// as a parameter or `None` if no such item exists in the queue. + pub fn clone_used_if

(&mut self, predicate: P) -> Option where P: Fn(&T) -> bool { + self.in_use.iter().find(|r| predicate(r)).cloned() + } + + /// Fork-function for `take_used_if` and `clone_used_if`. + pub fn get_used_if

(&mut self, action: GetAction, predicate: P) -> Option where P: Fn(&T) -> bool { + match action { + GetAction::Take => self.take_used_if(predicate), + GetAction::Clone => self.clone_used_if(predicate), + } + } + /// Returns the most recently pushed block if `f` returns `true` with a reference to it as /// a parameter, otherwise `None`. /// Will not destroy a block if a reference to it has previously been returned by `use_last_ref`, @@ -94,18 +116,66 @@ impl UsingQueue where T: Clone { } #[test] -fn should_find_when_pushed() { +fn should_not_find_when_pushed() { let mut q = UsingQueue::new(2); q.push(1); assert!(q.take_used_if(|i| i == &1).is_none()); } +#[test] +fn should_not_find_when_pushed_with_clone() { + let mut q = UsingQueue::new(2); + q.push(1); + assert!(q.clone_used_if(|i| i == &1).is_none()); +} + #[test] fn should_find_when_pushed_and_used() { let mut q = UsingQueue::new(2); q.push(1); q.use_last_ref(); - assert!(q.take_used_if(|i| i == &1).is_some()); + assert!(q.take_used_if(|i| i == &1).unwrap() == 1); +} + +#[test] +fn should_have_same_semantics_for_get_take_clone() { + let mut q = UsingQueue::new(2); + q.push(1); + assert!(q.get_used_if(GetAction::Clone, |i| i == &1).is_none()); + assert!(q.get_used_if(GetAction::Take, |i| i == &1).is_none()); + q.use_last_ref(); + assert!(q.get_used_if(GetAction::Clone, |i| i == &1).unwrap() == 1); + assert!(q.get_used_if(GetAction::Clone, |i| i == &1).unwrap() == 1); + assert!(q.get_used_if(GetAction::Take, |i| i == &1).unwrap() == 1); + assert!(q.get_used_if(GetAction::Clone, |i| i == &1).is_none()); + assert!(q.get_used_if(GetAction::Take, |i| i == &1).is_none()); +} + +#[test] +fn should_find_when_pushed_and_used_with_clone() { + let mut q = UsingQueue::new(2); + q.push(1); + q.use_last_ref(); + assert!(q.clone_used_if(|i| i == &1).unwrap() == 1); +} + +#[test] +fn should_not_find_again_when_pushed_and_taken() { + let mut q = UsingQueue::new(2); + q.push(1); + q.use_last_ref(); + assert!(q.take_used_if(|i| i == &1).unwrap() == 1); + assert!(q.clone_used_if(|i| i == &1).is_none()); +} + +#[test] +fn should_find_again_when_pushed_and_cloned() { + let mut q = UsingQueue::new(2); + q.push(1); + q.use_last_ref(); + assert!(q.clone_used_if(|i| i == &1).unwrap() == 1); + assert!(q.clone_used_if(|i| i == &1).unwrap() == 1); + assert!(q.take_used_if(|i| i == &1).unwrap() == 1); } #[test]