Merge branch 'work-notify' of github.com:ethcore/parity into work-notify

This commit is contained in:
Gav Wood
2016-06-29 20:14:32 +02:00
14 changed files with 202 additions and 25 deletions

View File

@@ -16,6 +16,7 @@
use rayon::prelude::*;
use std::sync::atomic::AtomicBool;
use std::time::{Instant, Duration};
use util::*;
use account_provider::AccountProvider;
@@ -53,12 +54,16 @@ pub struct MinerOptions {
pub reseal_on_external_tx: bool,
/// Reseal on receipt of new local transactions.
pub reseal_on_own_tx: bool,
/// Minimum period between transaction-inspired reseals.
pub reseal_min_period: Duration,
/// Maximum amount of gas to bother considering for block insertion.
pub tx_gas_limit: U256,
/// Maximum size of the transaction queue.
pub tx_queue_size: usize,
/// Whether we should fallback to providing all the queue's transactions or just pending.
pub pending_set: PendingSet,
/// How many historical work packages can we store before running out?
pub work_queue_size: usize,
}
impl Default for MinerOptions {
@@ -71,6 +76,8 @@ impl Default for MinerOptions {
tx_gas_limit: !U256::zero(),
tx_queue_size: 1024,
pending_set: PendingSet::AlwaysQueue,
reseal_min_period: Duration::from_secs(0),
work_queue_size: 20,
}
}
}
@@ -84,6 +91,7 @@ pub struct Miner {
// for sealing...
options: MinerOptions,
sealing_enabled: AtomicBool,
next_allowed_reseal: Mutex<Instant>,
sealing_block_last_request: Mutex<u64>,
gas_range_target: RwLock<(U256, U256)>,
author: RwLock<Address>,
@@ -101,8 +109,9 @@ impl Miner {
transaction_queue: Mutex::new(TransactionQueue::new()),
options: Default::default(),
sealing_enabled: AtomicBool::new(false),
next_allowed_reseal: Mutex::new(Instant::now()),
sealing_block_last_request: Mutex::new(0),
sealing_work: Mutex::new(UsingQueue::new(5)),
sealing_work: Mutex::new(UsingQueue::new(20)),
gas_range_target: RwLock::new((U256::zero(), U256::zero())),
author: RwLock::new(Address::default()),
extra_data: RwLock::new(Vec::new()),
@@ -118,12 +127,14 @@ impl Miner {
Arc::new(Miner {
transaction_queue: Mutex::new(TransactionQueue::with_limits(options.tx_queue_size, options.tx_gas_limit)),
sealing_enabled: AtomicBool::new(options.force_sealing || !options.new_work_notify.is_empty()),
options: options,
next_allowed_reseal: Mutex::new(Instant::now()),
sealing_block_last_request: Mutex::new(0),
sealing_work: Mutex::new(UsingQueue::new(5)),
sealing_work: Mutex::new(UsingQueue::new(options.work_queue_size)),
options: options,
gas_range_target: RwLock::new((U256::zero(), U256::zero())),
author: RwLock::new(Address::default()),
extra_data: RwLock::new(Vec::new()),
options: options,
accounts: accounts,
spec: spec,
work_poster: work_poster,
@@ -282,6 +293,9 @@ impl Miner {
// Return if
!have_work
}
/// Are we allowed to do a non-mandatory reseal?
fn tx_reseal_allowed(&self) -> bool { Instant::now() > *self.next_allowed_reseal.lock().unwrap() }
}
const SEALING_TIMEOUT_IN_BLOCKS : u64 = 5;
@@ -452,7 +466,7 @@ impl MinerService for Miner {
.map(|tx| transaction_queue.add(tx, &fetch_account, TransactionOrigin::External))
.collect()
};
if !results.is_empty() && self.options.reseal_on_external_tx {
if !results.is_empty() && self.options.reseal_on_external_tx && self.tx_reseal_allowed() {
self.update_sealing(chain);
}
results
@@ -487,7 +501,7 @@ impl MinerService for Miner {
import
};
if imported.is_ok() && self.options.reseal_on_own_tx {
if imported.is_ok() && self.options.reseal_on_own_tx && self.tx_reseal_allowed() {
// Make sure to do it after transaction is imported and lock is droped.
// We need to create pending block and enable sealing
let prepared = self.enable_and_prepare_sealing(chain);
@@ -580,6 +594,7 @@ impl MinerService for Miner {
self.sealing_enabled.store(false, atomic::Ordering::Relaxed);
self.sealing_work.lock().unwrap().reset();
} else {
*self.next_allowed_reseal.lock().unwrap() = Instant::now() + self.options.reseal_min_period;
self.prepare_sealing(chain);
}
}

View File

@@ -441,7 +441,7 @@ impl TransactionQueue {
trace!(target: "miner", "Importing: {:?}", tx.hash());
if tx.gas_price < self.minimal_gas_price {
if tx.gas_price < self.minimal_gas_price && origin != TransactionOrigin::Local {
trace!(target: "miner",
"Dropping transaction below minimal gas price threshold: {:?} (gp: {} < {})",
tx.hash(),
@@ -1055,7 +1055,7 @@ mod test {
}
#[test]
fn should_not_import_transaction_below_min_gas_price_threshold() {
fn should_not_import_transaction_below_min_gas_price_threshold_if_external() {
// given
let mut txq = TransactionQueue::new();
let tx = new_tx();
@@ -1074,6 +1074,23 @@ mod test {
assert_eq!(stats.future, 0);
}
#[test]
fn should_import_transaction_below_min_gas_price_threshold_if_local() {
// given
let mut txq = TransactionQueue::new();
let tx = new_tx();
txq.set_minimal_gas_price(tx.gas_price + U256::one());
// when
let res = txq.add(tx, &default_nonce, TransactionOrigin::Local);
// then
assert_eq!(res.unwrap(), TransactionImportResult::Current);
let stats = txq.status();
assert_eq!(stats.pending, 1);
assert_eq!(stats.future, 0);
}
#[test]
fn should_reject_incorectly_signed_transaction() {
// given

View File

@@ -0,0 +1,105 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
extern crate hyper;
use hyper::header::ContentType;
use hyper::method::Method;
use hyper::client::{Request, Response, Client};
use hyper::{Next};
use hyper::net::HttpStream;
use ethash::SeedHashCompute;
use hyper::Url;
use util::*;
use ethereum::ethash::Ethash;
pub struct WorkPoster {
urls: Vec<Url>,
client: Mutex<Client<PostHandler>>,
seed_compute: Mutex<SeedHashCompute>,
}
impl WorkPoster {
pub fn new(urls: &[String]) -> Self {
let urls = urls.into_iter().filter_map(|u| {
match Url::parse(&u) {
Ok(url) => Some(url),
Err(e) => {
warn!("Error parsing URL {} : {}", u, e);
None
}
}
}).collect();
let client = Client::<PostHandler>::configure()
.keep_alive(false)
.build().expect("Error creating HTTP client");
WorkPoster {
client: Mutex::new(client),
urls: urls,
seed_compute: Mutex::new(SeedHashCompute::new()),
}
}
pub fn notify(&self, pow_hash: H256, difficulty: U256, number: u64) {
// TODO: move this to engine
let target = Ethash::difficulty_to_boundary(&difficulty);
let seed_hash = &self.seed_compute.lock().unwrap().get_seedhash(number);
let seed_hash = H256::from_slice(&seed_hash[..]);
let body = format!(r#"{{ "result": ["0x{}","0x{}","0x{}","0x{:x}"] }}"#,
pow_hash.hex(), seed_hash.hex(), target.hex(), number);
let client = self.client.lock().unwrap();
for u in &self.urls {
if let Err(e) = client.request(u.clone(), PostHandler { body: body.clone() }) {
warn!("Error sending HTTP notification to {} : {}", u, e);
}
}
}
}
struct PostHandler {
body: String,
}
impl hyper::client::Handler<HttpStream> for PostHandler {
fn on_request(&mut self, request: &mut Request) -> Next {
request.set_method(Method::Post);
request.headers_mut().set(ContentType::json());
Next::write()
}
fn on_request_writable(&mut self, encoder: &mut hyper::Encoder<HttpStream>) -> Next {
if let Err(e) = encoder.write_all(self.body.as_bytes()) {
trace!("Error posting work data: {}", e);
}
encoder.close();
Next::read()
}
fn on_response(&mut self, _response: Response) -> Next {
Next::end()
}
fn on_response_readable(&mut self, _decoder: &mut hyper::Decoder<HttpStream>) -> Next {
Next::end()
}
fn on_error(&mut self, err: hyper::Error) -> Next {
trace!("Error posting work data: {}", err);
Next::end()
}
}