Merge pull request #1491 from ethcore/work-notify

Work notification over HTTP
This commit is contained in:
Arkadiy Paronyan 2016-06-30 08:19:45 +02:00 committed by GitHub
commit 60270083e5
12 changed files with 159 additions and 18 deletions

7
Cargo.lock generated
View File

@ -261,6 +261,7 @@ dependencies = [
"ethjson 0.1.0",
"ethstore 0.1.0",
"heapsize 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.9.4 (git+https://github.com/ethcore/hyper)",
"lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
@ -547,7 +548,7 @@ dependencies = [
[[package]]
name = "hyper"
version = "0.9.4"
source = "git+https://github.com/ethcore/hyper#7ccfcb2aa7e6aa6300efa8cebd6a0e6ce55582ea"
source = "git+https://github.com/ethcore/hyper#9e346c1d4bc30cd4142dea9d8a0b117d30858ca4"
dependencies = [
"cookie 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"httparse 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -556,7 +557,7 @@ dependencies = [
"mime 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rotor 0.6.3 (git+https://github.com/ethcore/rotor)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"spmc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"spmc 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
"traitobject 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"typeable 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1222,7 +1223,7 @@ dependencies = [
[[package]]
name = "spmc"
version = "0.2.0"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]

View File

@ -32,6 +32,10 @@ bloomchain = "0.1"
rayon = "0.3.1"
ethstore = { path = "../ethstore" }
[dependencies.hyper]
git = "https://github.com/ethcore/hyper"
default-features = false
[features]
jit = ["evmjit"]
evm-debug = []

View File

@ -14,9 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
extern crate ethash;
use self::ethash::{quick_get_difficulty, EthashManager, H256 as EH256};
use ethash::{quick_get_difficulty, EthashManager, H256 as EH256};
use common::*;
use block::*;
use spec::CommonParams;

View File

@ -91,6 +91,8 @@ extern crate ethjson;
extern crate bloomchain;
#[macro_use] extern crate ethcore_ipc as ipc;
extern crate rayon;
extern crate hyper;
extern crate ethash;
pub extern crate ethstore;
#[cfg(test)] extern crate ethcore_devtools as devtools;

View File

@ -29,6 +29,7 @@ use receipt::{Receipt};
use spec::Spec;
use engine::Engine;
use miner::{MinerService, MinerStatus, TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin};
use miner::work_notify::WorkPoster;
/// Different possible definitions for pending transaction set.
#[derive(Debug)]
@ -45,6 +46,8 @@ pub enum PendingSet {
/// Configures the behaviour of the miner.
#[derive(Debug)]
pub struct MinerOptions {
/// URLs to notify when there is new work.
pub new_work_notify: Vec<String>,
/// Force the miner to reseal, even when nobody has asked for work.
pub force_sealing: bool,
/// Reseal on receipt of new external transactions.
@ -66,6 +69,7 @@ pub struct MinerOptions {
impl Default for MinerOptions {
fn default() -> Self {
MinerOptions {
new_work_notify: vec![],
force_sealing: false,
reseal_on_external_tx: true,
reseal_on_own_tx: true,
@ -95,6 +99,7 @@ pub struct Miner {
spec: Spec,
accounts: Option<Arc<AccountProvider>>,
work_poster: Option<WorkPoster>,
}
impl Miner {
@ -112,23 +117,26 @@ impl Miner {
extra_data: RwLock::new(Vec::new()),
accounts: None,
spec: spec,
work_poster: None,
}
}
/// Creates new instance of miner
pub fn new(options: MinerOptions, spec: Spec, accounts: Option<Arc<AccountProvider>>) -> Arc<Miner> {
let work_poster = if !options.new_work_notify.is_empty() { Some(WorkPoster::new(&options.new_work_notify)) } else { None };
Arc::new(Miner {
transaction_queue: Mutex::new(TransactionQueue::with_limits(options.tx_queue_size, options.tx_gas_limit)),
sealing_enabled: AtomicBool::new(options.force_sealing),
sealing_enabled: AtomicBool::new(options.force_sealing || !options.new_work_notify.is_empty()),
next_allowed_reseal: Mutex::new(Instant::now()),
sealing_block_last_request: Mutex::new(0),
sealing_work: Mutex::new(UsingQueue::new(options.work_queue_size)),
gas_range_target: RwLock::new((U256::zero(), U256::zero())),
author: RwLock::new(Address::default()),
extra_data: RwLock::new(Vec::new()),
accounts: accounts,
options: options,
accounts: accounts,
spec: spec,
work_poster: work_poster,
})
}
@ -136,6 +144,10 @@ impl Miner {
self.spec.engine.deref()
}
fn forced_sealing(&self) -> bool {
self.options.force_sealing || !self.options.new_work_notify.is_empty()
}
/// Prepares new block for sealing including top transactions from queue.
#[cfg_attr(feature="dev", allow(match_same_arms))]
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
@ -237,13 +249,22 @@ impl Miner {
}
}
let work = {
let mut sealing_work = self.sealing_work.lock().unwrap();
if sealing_work.peek_last_ref().map_or(true, |pb| pb.block().fields().header.hash() != block.block().fields().header.hash()) {
let work = if sealing_work.peek_last_ref().map_or(true, |pb| pb.block().fields().header.hash() != block.block().fields().header.hash()) {
trace!(target: "miner", "Pushing a new, refreshed or borrowed pending {}...", block.block().fields().header.hash());
let pow_hash = block.block().fields().header.hash();
let number = block.block().fields().header.number();
let difficulty = *block.block().fields().header.difficulty();
sealing_work.push(block);
}
Some((pow_hash, difficulty, number))
} else {
None
};
trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.peek_last_ref().map(|b| b.block().fields().header.hash()));
work
};
work.map(|(pow_hash, difficulty, number)| self.work_poster.as_ref().map(|ref p| p.notify(pow_hash, difficulty, number)));
}
fn update_gas_limit(&self, chain: &MiningBlockChainClient) {
@ -562,7 +583,7 @@ impl MinerService for Miner {
let current_no = chain.chain_info().best_block_number;
let has_local_transactions = self.transaction_queue.lock().unwrap().has_local_pending_transactions();
let last_request = *self.sealing_block_last_request.lock().unwrap();
let should_disable_sealing = !self.options.force_sealing
let should_disable_sealing = !self.forced_sealing()
&& !has_local_transactions
&& current_no > last_request
&& current_no - last_request > SEALING_TIMEOUT_IN_BLOCKS;

View File

@ -45,6 +45,7 @@
mod miner;
mod external;
mod transaction_queue;
mod work_notify;
pub use self::transaction_queue::{TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin};
pub use self::miner::{Miner, MinerOptions, PendingSet};

View File

@ -0,0 +1,105 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
extern crate hyper;
use hyper::header::ContentType;
use hyper::method::Method;
use hyper::client::{Request, Response, Client};
use hyper::{Next};
use hyper::net::HttpStream;
use ethash::SeedHashCompute;
use hyper::Url;
use util::*;
use ethereum::ethash::Ethash;
pub struct WorkPoster {
urls: Vec<Url>,
client: Mutex<Client<PostHandler>>,
seed_compute: Mutex<SeedHashCompute>,
}
impl WorkPoster {
pub fn new(urls: &[String]) -> Self {
let urls = urls.into_iter().filter_map(|u| {
match Url::parse(&u) {
Ok(url) => Some(url),
Err(e) => {
warn!("Error parsing URL {} : {}", u, e);
None
}
}
}).collect();
let client = Client::<PostHandler>::configure()
.keep_alive(false)
.build().expect("Error creating HTTP client");
WorkPoster {
client: Mutex::new(client),
urls: urls,
seed_compute: Mutex::new(SeedHashCompute::new()),
}
}
pub fn notify(&self, pow_hash: H256, difficulty: U256, number: u64) {
// TODO: move this to engine
let target = Ethash::difficulty_to_boundary(&difficulty);
let seed_hash = &self.seed_compute.lock().unwrap().get_seedhash(number);
let seed_hash = H256::from_slice(&seed_hash[..]);
let body = format!(r#"{{ "result": ["0x{}","0x{}","0x{}","0x{:x}"] }}"#,
pow_hash.hex(), seed_hash.hex(), target.hex(), number);
let client = self.client.lock().unwrap();
for u in &self.urls {
if let Err(e) = client.request(u.clone(), PostHandler { body: body.clone() }) {
warn!("Error sending HTTP notification to {} : {}", u, e);
}
}
}
}
struct PostHandler {
body: String,
}
impl hyper::client::Handler<HttpStream> for PostHandler {
fn on_request(&mut self, request: &mut Request) -> Next {
request.set_method(Method::Post);
request.headers_mut().set(ContentType::json());
Next::write()
}
fn on_request_writable(&mut self, encoder: &mut hyper::Encoder<HttpStream>) -> Next {
if let Err(e) = encoder.write_all(self.body.as_bytes()) {
trace!("Error posting work data: {}", e);
}
encoder.close();
Next::read()
}
fn on_response(&mut self, _response: Response) -> Next {
Next::end()
}
fn on_response_readable(&mut self, _decoder: &mut hyper::Decoder<HttpStream>) -> Next {
Next::end()
}
fn on_error(&mut self, err: hyper::Error) -> Next {
trace!("Error posting work data: {}", err);
Next::end()
}
}

View File

@ -169,6 +169,8 @@ Sealing/Mining Options:
more than 32 characters.
--tx-queue-size LIMIT Maximum amount of transactions in the queue (waiting
to be included in next block) [default: 1024].
--notify-work URLS URLs to which work package notifications are pushed.
URLS should be a comma-delimited list of HTTP URLs.
Footprint Options:
--tracing BOOL Indicates if full transaction tracing should be
@ -320,6 +322,7 @@ pub struct Args {
pub flag_gas_cap: String,
pub flag_extra_data: Option<String>,
pub flag_tx_queue_size: usize,
pub flag_notify_work: Option<String>,
pub flag_logging: Option<String>,
pub flag_version: bool,
pub flag_from: String,

View File

@ -84,6 +84,10 @@ impl Configuration {
)
}
fn work_notify(&self) -> Vec<String> {
self.args.flag_notify_work.as_ref().map_or_else(Vec::new, |s| s.split(',').map(|s| s.to_owned()).collect())
}
pub fn miner_options(&self) -> MinerOptions {
let (own, ext) = match self.args.flag_reseal_on_txs.as_str() {
"none" => (false, false),
@ -93,6 +97,7 @@ impl Configuration {
x => die!("{}: Invalid value for --reseal option. Use --help for more information.", x)
};
MinerOptions {
new_work_notify: self.work_notify(),
force_sealing: self.args.flag_force_sealing,
reseal_on_external_tx: ext,
reseal_on_own_tx: own,

View File

@ -498,7 +498,7 @@ impl<C, S, M, EM> Eth for EthClient<C, S, M, EM> where
let pow_hash = b.hash();
let target = Ethash::difficulty_to_boundary(b.block().header().difficulty());
let seed_hash = &self.seed_compute.lock().unwrap().get_seedhash(b.block().header().number());
to_value(&(pow_hash, H256::from_slice(&seed_hash[..]), target))
to_value(&(pow_hash, H256::from_slice(&seed_hash[..]), target, &U256::from(b.block().header().number())))
}).unwrap_or(Err(Error::internal_error())) // no work found.
},
_ => Err(Error::invalid_params())

View File

@ -60,6 +60,7 @@ fn miner_service(spec: Spec, accounts: Arc<AccountProvider>) -> Arc<Miner> {
pending_set: PendingSet::SealingOrElseQueue,
reseal_min_period: Duration::from_secs(0),
work_queue_size: 50,
new_work_notify: vec![],
},
spec,
Some(accounts)

View File

@ -88,7 +88,7 @@ impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol {
/// Timer function called after a timeout created with `NetworkContext::timeout`.
fn timeout(&self, io: &NetworkContext<TestProtocolMessage>, timer: TimerToken) {
io.message(TestProtocolMessage { payload: 22 });
io.message(TestProtocolMessage { payload: 22 }).unwrap();
assert_eq!(timer, 0);
self.got_timeout.store(true, AtomicOrdering::Relaxed);
}