From b3f37f3cb46300ee2a10e790b8fb22c1902db7d3 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 29 Jun 2016 20:07:21 +0200 Subject: [PATCH] HTTP work notifier --- Cargo.lock | 7 ++- ethcore/Cargo.toml | 4 ++ ethcore/src/ethereum/ethash.rs | 4 +- ethcore/src/lib.rs | 2 + ethcore/src/miner/miner.rs | 37 ++++++++--- ethcore/src/miner/mod.rs | 1 + ethcore/src/miner/work_notify.rs | 105 +++++++++++++++++++++++++++++++ parity/cli.rs | 2 +- parity/configuration.rs | 4 +- 9 files changed, 148 insertions(+), 18 deletions(-) create mode 100644 ethcore/src/miner/work_notify.rs diff --git a/Cargo.lock b/Cargo.lock index 87e66a516..0a2480ac1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -261,6 +261,7 @@ dependencies = [ "ethjson 0.1.0", "ethstore 0.1.0", "heapsize 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.9.4 (git+https://github.com/ethcore/hyper)", "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -547,7 +548,7 @@ dependencies = [ [[package]] name = "hyper" version = "0.9.4" -source = "git+https://github.com/ethcore/hyper#7ccfcb2aa7e6aa6300efa8cebd6a0e6ce55582ea" +source = "git+https://github.com/ethcore/hyper#9e346c1d4bc30cd4142dea9d8a0b117d30858ca4" dependencies = [ "cookie 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -556,7 +557,7 @@ dependencies = [ "mime 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "rotor 0.6.3 (git+https://github.com/ethcore/rotor)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", - "spmc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "spmc 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", "traitobject 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "typeable 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1222,7 +1223,7 @@ dependencies = [ [[package]] name = "spmc" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] diff --git a/ethcore/Cargo.toml b/ethcore/Cargo.toml index 2b56bf581..2d94aebab 100644 --- a/ethcore/Cargo.toml +++ b/ethcore/Cargo.toml @@ -32,6 +32,10 @@ bloomchain = "0.1" rayon = "0.3.1" ethstore = { path = "../ethstore" } +[dependencies.hyper] +git = "https://github.com/ethcore/hyper" +default-features = false + [features] jit = ["evmjit"] evm-debug = [] diff --git a/ethcore/src/ethereum/ethash.rs b/ethcore/src/ethereum/ethash.rs index 3400220db..84c2a9608 100644 --- a/ethcore/src/ethereum/ethash.rs +++ b/ethcore/src/ethereum/ethash.rs @@ -14,9 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -extern crate ethash; - -use self::ethash::{quick_get_difficulty, EthashManager, H256 as EH256}; +use ethash::{quick_get_difficulty, EthashManager, H256 as EH256}; use common::*; use block::*; use spec::CommonParams; diff --git a/ethcore/src/lib.rs b/ethcore/src/lib.rs index 9919ec62a..54a944331 100644 --- a/ethcore/src/lib.rs +++ b/ethcore/src/lib.rs @@ -91,6 +91,8 @@ extern crate ethjson; extern crate bloomchain; #[macro_use] extern crate ethcore_ipc as ipc; extern crate rayon; +extern crate hyper; +extern crate ethash; pub extern crate ethstore; #[cfg(test)] extern crate ethcore_devtools as devtools; diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index d91a13b96..18202b1f2 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -29,6 +29,7 @@ use receipt::{Receipt}; use spec::Spec; use engine::Engine; use miner::{MinerService, MinerStatus, TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin}; +use miner::work_notify::WorkPoster; /// Different possible definitions for pending transaction set. #[derive(Debug)] @@ -98,6 +99,7 @@ pub struct Miner { spec: Spec, accounts: Option>, + work_poster: Option, } impl Miner { @@ -115,15 +117,18 @@ impl Miner { extra_data: RwLock::new(Vec::new()), accounts: None, spec: spec, + work_poster: None, } } /// Creates new instance of miner pub fn new(options: MinerOptions, spec: Spec, accounts: Option>) -> Arc { + let work_poster = if !options.new_work_notify.is_empty() { Some(WorkPoster::new(&options.new_work_notify)) } else { None }; Arc::new(Miner { transaction_queue: Mutex::new(TransactionQueue::with_limits(options.tx_queue_size, options.tx_gas_limit)), - sealing_enabled: AtomicBool::new(options.force_sealing), + sealing_enabled: AtomicBool::new(options.force_sealing || !options.new_work_notify.is_empty()), next_allowed_reseal: Mutex::new(Instant::now()), + options: options, sealing_block_last_request: Mutex::new(0), sealing_work: Mutex::new(UsingQueue::new(options.work_queue_size)), gas_range_target: RwLock::new((U256::zero(), U256::zero())), @@ -132,6 +137,7 @@ impl Miner { accounts: accounts, options: options, spec: spec, + work_poster: work_poster, }) } @@ -139,6 +145,10 @@ impl Miner { self.spec.engine.deref() } + fn forced_sealing(&self) -> bool { + self.options.force_sealing || !self.options.new_work_notify.is_empty() + } + /// Prepares new block for sealing including top transactions from queue. #[cfg_attr(feature="dev", allow(match_same_arms))] #[cfg_attr(feature="dev", allow(cyclomatic_complexity))] @@ -240,13 +250,22 @@ impl Miner { } } - let mut sealing_work = self.sealing_work.lock().unwrap(); - if sealing_work.peek_last_ref().map_or(true, |pb| pb.block().fields().header.hash() != block.block().fields().header.hash()) { - trace!(target: "miner", "Pushing a new, refreshed or borrowed pending {}...", block.block().fields().header.hash()); - sealing_work.push(block); - } - - trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.peek_last_ref().map(|b| b.block().fields().header.hash())); + let work = { + let mut sealing_work = self.sealing_work.lock().unwrap(); + let work = if sealing_work.peek_last_ref().map_or(true, |pb| pb.block().fields().header.hash() != block.block().fields().header.hash()) { + trace!(target: "miner", "Pushing a new, refreshed or borrowed pending {}...", block.block().fields().header.hash()); + let pow_hash = block.block().fields().header.hash(); + let number = block.block().fields().header.number(); + let difficulty = *block.block().fields().header.difficulty(); + sealing_work.push(block); + Some((pow_hash, difficulty, number)) + } else { + None + }; + trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.peek_last_ref().map(|b| b.block().fields().header.hash())); + work + }; + work.map(|(pow_hash, difficulty, number)| self.work_poster.as_ref().map(|ref p| p.notify(pow_hash, difficulty, number))); } fn update_gas_limit(&self, chain: &MiningBlockChainClient) { @@ -565,7 +584,7 @@ impl MinerService for Miner { let current_no = chain.chain_info().best_block_number; let has_local_transactions = self.transaction_queue.lock().unwrap().has_local_pending_transactions(); let last_request = *self.sealing_block_last_request.lock().unwrap(); - let should_disable_sealing = !self.options.force_sealing + let should_disable_sealing = !self.forced_sealing() && !has_local_transactions && current_no > last_request && current_no - last_request > SEALING_TIMEOUT_IN_BLOCKS; diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index e65d6048a..152bd1a61 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -45,6 +45,7 @@ mod miner; mod external; mod transaction_queue; +mod work_notify; pub use self::transaction_queue::{TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin}; pub use self::miner::{Miner, MinerOptions, PendingSet}; diff --git a/ethcore/src/miner/work_notify.rs b/ethcore/src/miner/work_notify.rs new file mode 100644 index 000000000..6144e2d3d --- /dev/null +++ b/ethcore/src/miner/work_notify.rs @@ -0,0 +1,105 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +extern crate hyper; + +use hyper::header::ContentType; +use hyper::method::Method; +use hyper::client::{Request, Response, Client}; +use hyper::{Next}; +use hyper::net::HttpStream; +use ethash::SeedHashCompute; +use hyper::Url; +use util::*; +use ethereum::ethash::Ethash; + +pub struct WorkPoster { + urls: Vec, + client: Mutex>, + seed_compute: Mutex, +} + +impl WorkPoster { + pub fn new(urls: &[String]) -> Self { + let urls = urls.into_iter().filter_map(|u| { + match Url::parse(&u) { + Ok(url) => Some(url), + Err(e) => { + warn!("Error parsing URL {} : {}", u, e); + None + } + } + }).collect(); + let client = Client::::configure() + .keep_alive(false) + .build().expect("Error creating HTTP client"); + WorkPoster { + client: Mutex::new(client), + urls: urls, + seed_compute: Mutex::new(SeedHashCompute::new()), + } + } + + pub fn notify(&self, pow_hash: H256, difficulty: U256, number: u64) { + // TODO: move this to engine + let target = Ethash::difficulty_to_boundary(&difficulty); + let seed_hash = &self.seed_compute.lock().unwrap().get_seedhash(number); + let seed_hash = H256::from_slice(&seed_hash[..]); + let body = format!(r#"{{ "result": ["0x{}","0x{}","0x{}","0x{:x}"] }}"#, + pow_hash.hex(), seed_hash.hex(), target.hex(), number); + let client = self.client.lock().unwrap(); + for u in &self.urls { + if let Err(e) = client.request(u.clone(), PostHandler { body: body.clone() }) { + warn!("Error sending HTTP notification to {} : {}", u, e); + } + } + } +} + +struct PostHandler { + body: String, +} + +impl hyper::client::Handler for PostHandler { + fn on_request(&mut self, request: &mut Request) -> Next { + request.set_method(Method::Post); + request.headers_mut().set(ContentType::json()); + Next::write() + } + + fn on_request_writable(&mut self, encoder: &mut hyper::Encoder) -> Next { + if let Err(e) = encoder.write_all(self.body.as_bytes()) { + trace!("Error posting work data: {}", e); + } + encoder.close(); + Next::read() + + } + + fn on_response(&mut self, _response: Response) -> Next { + Next::end() + } + + fn on_response_readable(&mut self, _decoder: &mut hyper::Decoder) -> Next { + Next::end() + } + + fn on_error(&mut self, err: hyper::Error) -> Next { + trace!("Error posting work data: {}", err); + Next::end() + } +} + diff --git a/parity/cli.rs b/parity/cli.rs index e91acc5c6..3c1eee8e8 100644 --- a/parity/cli.rs +++ b/parity/cli.rs @@ -322,7 +322,7 @@ pub struct Args { pub flag_gas_cap: String, pub flag_extra_data: Option, pub flag_tx_queue_size: usize, - pub flag_work_notify: Option, + pub flag_notify_work: Option, pub flag_logging: Option, pub flag_version: bool, pub flag_from: String, diff --git a/parity/configuration.rs b/parity/configuration.rs index ffdf44126..0cfb7c44f 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -84,8 +84,8 @@ impl Configuration { ) } - pub fn work_notify(&self) -> Vec { - self.args.flag_work_notify.as_ref().map_or_else(Vec::new, |s| s.split(',').map(|s| s.to_owned()).collect()) + fn work_notify(&self) -> Vec { + self.args.flag_notify_work.as_ref().map_or_else(Vec::new, |s| s.split(',').map(|s| s.to_owned()).collect()) } pub fn miner_options(&self) -> MinerOptions {