From 99a6802b619cb3c1c821ed9645d509dfc9cbc9b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 8 Mar 2016 15:46:44 +0100 Subject: [PATCH] Moving block sealing and transaction_queue to separate create --- Cargo.lock | 18 ++- Cargo.toml | 1 + ethcore/src/client.rs | 105 ++++------------ miner/Cargo.toml | 20 +++ miner/src/lib.rs | 86 +++++++++++++ miner/src/miner.rs | 149 +++++++++++++++++++++++ {sync => miner}/src/transaction_queue.rs | 0 parity/main.rs | 73 ++++++----- rpc/Cargo.toml | 1 + rpc/src/lib.rs | 1 + rpc/src/v1/impls/eth.rs | 13 +- sync/Cargo.toml | 5 +- sync/src/chain.rs | 56 ++------- sync/src/lib.rs | 14 +-- 14 files changed, 375 insertions(+), 167 deletions(-) create mode 100644 miner/Cargo.toml create mode 100644 miner/src/lib.rs create mode 100644 miner/src/miner.rs rename {sync => miner}/src/transaction_queue.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 510e69b59..505fcac63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,7 @@ dependencies = [ "ethcore-devtools 0.9.99", "ethcore-rpc 0.9.99", "ethcore-util 0.9.99", + "ethminer 0.9.99", "ethsync 0.9.99", "fdlimit 0.1.0", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -235,6 +236,7 @@ dependencies = [ "ethash 0.9.99", "ethcore 0.9.99", "ethcore-util 0.9.99", + "ethminer 0.9.99", "ethsync 0.9.99", "jsonrpc-core 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-http-server 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -282,6 +284,19 @@ dependencies = [ "vergen 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ethminer" +version = "0.9.99" +dependencies = [ + "clippy 0.0.44 (registry+https://github.com/rust-lang/crates.io-index)", + "env_logger 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "ethcore 0.9.99", + "ethcore-util 0.9.99", + "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ethsync" version = "0.9.99" @@ -290,11 +305,10 @@ dependencies = [ "env_logger 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore 0.9.99", "ethcore-util 0.9.99", + "ethminer 0.9.99", "heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", - "rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/Cargo.toml b/Cargo.toml index 9b8ec6405..7e5bc334b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ clippy = { version = "0.0.44", optional = true } ethcore-util = { path = "util" } ethcore = { path = "ethcore" } ethsync = { path = "sync" } +ethminer = { path = "miner" } ethcore-rpc = { path = "rpc", optional = true } fdlimit = { path = "util/fdlimit" } daemonize = "0.2" diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 874fc9646..af1745ca8 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -17,7 +17,6 @@ //! Blockchain database client. use std::marker::PhantomData; -use std::sync::atomic::AtomicBool; use util::*; use util::panics::*; use blockchain::{BlockChain, BlockProvider}; @@ -185,6 +184,9 @@ pub trait BlockChainClient : Sync + Send { /// Returns logs matching given filter. fn logs(&self, filter: Filter) -> Vec; + + fn prepare_sealing(&self, author: Address, extra_data: Bytes) -> Option; + fn try_seal(&self, block: ClosedBlock, seal: Vec) -> Result; } #[derive(Default, Clone, Debug, Eq, PartialEq)] @@ -219,12 +221,6 @@ pub struct Client where V: Verifier { report: RwLock, import_lock: Mutex<()>, panic_handler: Arc, - - // for sealing... - sealing_enabled: AtomicBool, - sealing_block: Mutex>, - author: RwLock
, - extra_data: RwLock, verifier: PhantomData, secret_store: Arc>, } @@ -273,10 +269,6 @@ impl Client where V: Verifier { report: RwLock::new(Default::default()), import_lock: Mutex::new(()), panic_handler: panic_handler, - sealing_enabled: AtomicBool::new(false), - sealing_block: Mutex::new(None), - author: RwLock::new(Address::new()), - extra_data: RwLock::new(Vec::new()), verifier: PhantomData, secret_store: secret_store, })) @@ -425,10 +417,6 @@ impl Client where V: Verifier { } } - if self.chain_info().best_block_hash != original_best && self.sealing_enabled.load(atomic::Ordering::Relaxed) { - self.prepare_sealing(); - } - imported } @@ -477,85 +465,46 @@ impl Client where V: Verifier { BlockId::Latest => Some(self.chain.read().unwrap().best_block_number()) } } +} - /// Get the author that we will seal blocks as. - pub fn author(&self) -> Address { - self.author.read().unwrap().clone() + +// TODO: need MinerService MinerIoHandler + +impl BlockChainClient for Client where V: Verifier { + + + fn try_seal(&self, block: ClosedBlock, seal: Vec) -> Result { + block.try_seal(self.engine.deref().deref(), seal) } - /// Set the author that we will seal blocks as. - pub fn set_author(&self, author: Address) { - *self.author.write().unwrap() = author; - } - - /// Get the extra_data that we will seal blocks wuth. - pub fn extra_data(&self) -> Bytes { - self.extra_data.read().unwrap().clone() - } - - /// Set the extra_data that we will seal blocks with. - pub fn set_extra_data(&self, extra_data: Bytes) { - *self.extra_data.write().unwrap() = extra_data; - } - - /// New chain head event. Restart mining operation. - pub fn prepare_sealing(&self) { + fn prepare_sealing(&self, author: Address, extra_data: Bytes) -> Option { + let engine = self.engine.deref().deref(); let h = self.chain.read().unwrap().best_block_hash(); + let mut b = OpenBlock::new( - self.engine.deref().deref(), + engine, self.state_db.lock().unwrap().clone(), - match self.chain.read().unwrap().block_header(&h) { Some(ref x) => x, None => {return;} }, + match self.chain.read().unwrap().block_header(&h) { Some(ref x) => x, None => { return None; } }, self.build_last_hashes(h.clone()), - self.author(), - self.extra_data() + author, + extra_data, ); - self.chain.read().unwrap().find_uncle_headers(&h, self.engine.deref().deref().maximum_uncle_age()).unwrap().into_iter().take(self.engine.deref().deref().maximum_uncle_count()).foreach(|h| { b.push_uncle(h).unwrap(); }); + self.chain.read().unwrap().find_uncle_headers(&h, engine.maximum_uncle_age()) + .unwrap() + .into_iter() + .take(engine.maximum_uncle_count()) + .foreach(|h| { + b.push_uncle(h).unwrap(); + }); // TODO: push transactions. let b = b.close(); trace!("Sealing: number={}, hash={}, diff={}", b.hash(), b.block().header().difficulty(), b.block().header().number()); - *self.sealing_block.lock().unwrap() = Some(b); + Some(b) } - /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. - pub fn sealing_block(&self) -> &Mutex> { - if self.sealing_block.lock().unwrap().is_none() { - self.sealing_enabled.store(true, atomic::Ordering::Relaxed); - // TODO: Above should be on a timer that resets after two blocks have arrived without being asked for. - self.prepare_sealing(); - } - &self.sealing_block - } - - /// Submit `seal` as a valid solution for the header of `pow_hash`. - /// Will check the seal, but not actually insert the block into the chain. - pub fn submit_seal(&self, pow_hash: H256, seal: Vec) -> Result<(), Error> { - let mut maybe_b = self.sealing_block.lock().unwrap(); - match *maybe_b { - Some(ref b) if b.hash() == pow_hash => {} - _ => { return Err(Error::PowHashInvalid); } - } - - let b = maybe_b.take(); - match b.unwrap().try_seal(self.engine.deref().deref(), seal) { - Err(old) => { - *maybe_b = Some(old); - Err(Error::PowInvalid) - } - Ok(sealed) => { - // TODO: commit DB from `sealed.drain` and make a VerifiedBlock to skip running the transactions twice. - try!(self.import_block(sealed.rlp_bytes())); - Ok(()) - } - } - } -} - -// TODO: need MinerService MinerIoHandler - -impl BlockChainClient for Client where V: Verifier { fn block_header(&self, id: BlockId) -> Option { let chain = self.chain.read().unwrap(); Self::block_hash(&chain, id).and_then(|hash| chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec())) diff --git a/miner/Cargo.toml b/miner/Cargo.toml new file mode 100644 index 000000000..0972aa122 --- /dev/null +++ b/miner/Cargo.toml @@ -0,0 +1,20 @@ +[package] +description = "Ethminer library" +homepage = "http://ethcore.io" +license = "GPL-3.0" +name = "ethminer" +version = "0.9.99" +authors = ["Ethcore "] + +[dependencies] +ethcore-util = { path = "../util" } +ethcore = { path = "../ethcore" } +log = "0.3" +env_logger = "0.3" +rustc-serialize = "0.3" +rayon = "0.3.1" +clippy = { version = "0.0.44", optional = true } + +[features] +dev = ["clippy"] +default = [] diff --git a/miner/src/lib.rs b/miner/src/lib.rs new file mode 100644 index 000000000..e8a50e9b5 --- /dev/null +++ b/miner/src/lib.rs @@ -0,0 +1,86 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +#![warn(missing_docs)] +#![cfg_attr(feature="dev", feature(plugin))] +#![cfg_attr(feature="dev", plugin(clippy))] + +#[macro_use] +extern crate log; +#[macro_use] +extern crate ethcore_util as util; +extern crate ethcore; +extern crate env_logger; +extern crate rayon; + +mod miner; +mod transaction_queue; + +use util::{Bytes, H256, Address}; +use std::ops::*; +use std::sync::*; +use util::TimerToken; +use ethcore::block::*; +use ethcore::error::*; +use ethcore::client::{Client, BlockChainClient}; +use ethcore::transaction::*; +use miner::Miner; + +pub struct EthMiner { + miner: Miner, + /// Shared blockchain client. TODO: this should evetually become an IPC endpoint + chain: Arc, +} + +impl EthMiner { + /// Creates and register protocol with the network service + pub fn new(chain: Arc) -> Arc { + Arc::new(EthMiner { + miner: Miner::new(), + chain: chain, + }) + } + + pub fn sealing_block(&self) -> &Mutex> { + self.miner.sealing_block(self.chain.deref()) + } + + pub fn submit_seal(&self, pow_hash: H256, seal: Vec) -> Result<(), Error> { + self.miner.submit_seal(self.chain.deref(), pow_hash, seal) + } + + /// Set the author that we will seal blocks as. + pub fn set_author(&self, author: Address) { + self.miner.set_author(author); + } + + /// Set the extra_data that we will seal blocks with. + pub fn set_extra_data(&self, extra_data: Bytes) { + self.miner.set_extra_data(extra_data); + } + + pub fn import_transactions(&self, transactions: Vec) { + let chain = self.chain.deref(); + let fetch_latest_nonce = |a : &Address| chain.nonce(a); + + self.miner.import_transactions(transactions, fetch_latest_nonce); + } + + pub fn chain_new_blocks(&self, good: &[H256], bad: &[H256], retracted: &[H256]) { + let mut chain = self.chain.deref(); + self.miner.chain_new_blocks(chain, good, bad, retracted); + } +} diff --git a/miner/src/miner.rs b/miner/src/miner.rs new file mode 100644 index 000000000..1a48d5288 --- /dev/null +++ b/miner/src/miner.rs @@ -0,0 +1,149 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use util::*; +use std::sync::atomic::AtomicBool; +use rayon::prelude::*; +use ethcore::views::{HeaderView, BlockView}; +use ethcore::header::{BlockNumber, Header as BlockHeader}; +use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo}; +use ethcore::block::*; +use ethcore::error::*; +use ethcore::transaction::SignedTransaction; +use transaction_queue::TransactionQueue; + +pub struct Miner { + /// Transactions Queue + transaction_queue: Mutex, + + // for sealing... + sealing_enabled: AtomicBool, + sealing_block: Mutex>, + author: RwLock
, + extra_data: RwLock, +} + +impl Miner { + pub fn new() -> Miner { + Miner { + transaction_queue: Mutex::new(TransactionQueue::new()), + sealing_enabled: AtomicBool::new(false), + sealing_block: Mutex::new(None), + author: RwLock::new(Address::new()), + extra_data: RwLock::new(Vec::new()), + } + } + + pub fn import_transactions(&self, transactions: Vec, fetch_nonce: T) + where T: Fn(&Address) -> U256 { + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.add_all(transactions, fetch_nonce); + } + + /// Get the author that we will seal blocks as. + pub fn author(&self) -> Address { + self.author.read().unwrap().clone() + } + + /// Set the author that we will seal blocks as. + pub fn set_author(&self, author: Address) { + *self.author.write().unwrap() = author; + } + + /// Get the extra_data that we will seal blocks wuth. + pub fn extra_data(&self) -> Bytes { + self.extra_data.read().unwrap().clone() + } + + /// Set the extra_data that we will seal blocks with. + pub fn set_extra_data(&self, extra_data: Bytes) { + *self.extra_data.write().unwrap() = extra_data; + } + + /// New chain head event. Restart mining operation. + fn prepare_sealing(&self, chain: &BlockChainClient) { + let b = chain.prepare_sealing(self.author.read().unwrap().clone(), self.extra_data.read().unwrap().clone()); + *self.sealing_block.lock().unwrap() = b; + } + + /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. + pub fn sealing_block(&self, chain: &BlockChainClient) -> &Mutex> { + if self.sealing_block.lock().unwrap().is_none() { + self.sealing_enabled.store(true, atomic::Ordering::Relaxed); + // TODO: Above should be on a timer that resets after two blocks have arrived without being asked for. + self.prepare_sealing(chain); + } + &self.sealing_block + } + + /// Submit `seal` as a valid solution for the header of `pow_hash`. + /// Will check the seal, but not actually insert the block into the chain. + pub fn submit_seal(&self, chain: &BlockChainClient, pow_hash: H256, seal: Vec) -> Result<(), Error> { + let mut maybe_b = self.sealing_block.lock().unwrap(); + match *maybe_b { + Some(ref b) if b.hash() == pow_hash => {} + _ => { return Err(Error::PowHashInvalid); } + } + + let b = maybe_b.take(); + match chain.try_seal(b.unwrap(), seal) { + Err(old) => { + *maybe_b = Some(old); + Err(Error::PowInvalid) + } + Ok(sealed) => { + // TODO: commit DB from `sealed.drain` and make a VerifiedBlock to skip running the transactions twice. + try!(chain.import_block(sealed.rlp_bytes())); + Ok(()) + } + } + } + + /// called when block is imported to chain, updates transactions queue and propagates the blocks + pub fn chain_new_blocks(&self, chain: &BlockChainClient, good: &[H256], bad: &[H256], _retracted: &[H256]) { + fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { + let block = chain + .block(BlockId::Hash(hash.clone())) + // Client should send message after commit to db and inserting to chain. + .expect("Expected in-chain blocks."); + let block = BlockView::new(&block); + block.transactions() + } + + { + let good = good.par_iter().map(|h| fetch_transactions(chain, h)); + let bad = bad.par_iter().map(|h| fetch_transactions(chain, h)); + + good.for_each(|txs| { + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); + transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); + }); + bad.for_each(|txs| { + // populate sender + for tx in &txs { + let _sender = tx.sender(); + } + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.add_all(txs, |a| chain.nonce(a)); + }); + } + + if self.sealing_enabled.load(atomic::Ordering::Relaxed) { + self.prepare_sealing(chain); + } + } +} diff --git a/sync/src/transaction_queue.rs b/miner/src/transaction_queue.rs similarity index 100% rename from sync/src/transaction_queue.rs rename to miner/src/transaction_queue.rs diff --git a/parity/main.rs b/parity/main.rs index 43b0504f1..ef088ab5b 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -24,6 +24,7 @@ extern crate rustc_serialize; extern crate ethcore_util as util; extern crate ethcore; extern crate ethsync; +extern crate ethminer; #[macro_use] extern crate log as rlog; extern crate env_logger; @@ -49,6 +50,7 @@ use ethcore::client::*; use ethcore::service::{ClientService, NetSyncMessage}; use ethcore::ethereum; use ethsync::{EthSync, SyncConfig}; +use ethminer::{EthMiner}; use docopt::Docopt; use daemonize::Daemonize; use number_prefix::{binary_prefix, Standalone, Prefixed}; @@ -79,6 +81,7 @@ Protocol Options: --networkid INDEX Override the network identifier from the chain we are on. --archive Client should not prune the state/storage trie. -d --datadir PATH Specify the database & configuration directory path [default: $HOME/.parity] + --keys-path PATH Specify the path for JSON key files to be found [default: $HOME/.web3/keys] --identity NAME Specify your node's name. Networking Options: @@ -107,13 +110,13 @@ API and Console Options: Sealing/Mining Options: --author ADDRESS Specify the block author (aka "coinbase") address for sending block rewards from sealed blocks [default: 0037a6b811ffeb6e072da21179d11b1406371c63]. - --extradata STRING Specify a custom extra-data for authored blocks, no more than 32 characters. + --extradata STRING Specify a custom extra-data for authored blocks, no more than 32 characters. Memory Footprint Options: --cache-pref-size BYTES Specify the prefered size of the blockchain cache in bytes [default: 16384]. --cache-max-size BYTES Specify the maximum size of the blockchain cache in bytes [default: 262144]. --queue-max-size BYTES Specify the maximum size of memory to use for block queue [default: 52428800]. - --cache MEGABYTES Set total amount of cache to use for the entire system, mutually exclusive with + --cache MEGABYTES Set total amount of cache to use for the entire system, mutually exclusive with other cache options (geth-compatible). Miscellaneous Options: @@ -129,7 +132,7 @@ struct Args { arg_enode: Vec, flag_chain: String, flag_testnet: bool, - flag_db_path: String, + flag_datadir: String, flag_networkid: Option, flag_identity: String, flag_cache: Option, @@ -189,7 +192,7 @@ fn setup_log(init: &Option) { } #[cfg(feature = "rpc")] -fn setup_rpc_server(client: Arc, sync: Arc, url: &str, cors_domain: &str, apis: Vec<&str>) { +fn setup_rpc_server(client: Arc, sync: Arc, miner: Arc, url: &str, cors_domain: &str, apis: Vec<&str>) { use rpc::v1::*; let mut server = rpc::HttpServer::new(1); @@ -198,7 +201,7 @@ fn setup_rpc_server(client: Arc, sync: Arc, url: &str, cors_dom "web3" => server.add_delegate(Web3Client::new().to_delegate()), "net" => server.add_delegate(NetClient::new(&sync).to_delegate()), "eth" => { - server.add_delegate(EthClient::new(&client, &sync).to_delegate()); + server.add_delegate(EthClient::new(&client, &sync, &miner).to_delegate()); server.add_delegate(EthFilterClient::new(&client).to_delegate()); } _ => { @@ -238,7 +241,7 @@ impl Configuration { } fn path(&self) -> String { - self.args.flag_db_path.replace("$HOME", env::home_dir().unwrap().to_str().unwrap()) + self.args.flag_datadir.replace("$HOME", env::home_dir().unwrap().to_str().unwrap()) } fn author(&self) -> Address { @@ -323,6 +326,32 @@ impl Configuration { ret } + fn client_config(&self) -> ClientConfig { + let mut client_config = ClientConfig::default(); + match self.args.flag_cache { + Some(mb) => { + client_config.blockchain.max_cache_size = mb * 1024 * 1024; + client_config.blockchain.pref_cache_size = client_config.blockchain.max_cache_size / 2; + } + None => { + client_config.blockchain.pref_cache_size = self.args.flag_cache_pref_size; + client_config.blockchain.max_cache_size = self.args.flag_cache_max_size; + } + } + client_config.prefer_journal = !self.args.flag_archive; + client_config.name = self.args.flag_identity.clone(); + client_config.queue.max_mem_use = self.args.flag_queue_max_size; + client_config + } + + fn sync_config(&self, spec: &Spec) -> SyncConfig { + let mut sync_config = SyncConfig::default(); + sync_config.network_id = self.args.flag_networkid.as_ref().map(|id| { + U256::from_str(id).unwrap_or_else(|_| die!("{}: Invalid index given with --networkid", id)) + }).unwrap_or(spec.network_id()); + sync_config + } + fn execute(&self) { if self.args.flag_version { print_version(); @@ -346,31 +375,19 @@ impl Configuration { let spec = self.spec(); let net_settings = self.net_settings(&spec); - let mut sync_config = SyncConfig::default(); - sync_config.network_id = self.args.flag_networkid.as_ref().map(|id| U256::from_str(id).unwrap_or_else(|_| die!("{}: Invalid index given with --networkid", id))).unwrap_or(spec.network_id()); + let sync_config = self.sync_config(&spec); // Build client - let mut client_config = ClientConfig::default(); - match self.args.flag_cache { - Some(mb) => { - client_config.blockchain.max_cache_size = mb * 1024 * 1024; - client_config.blockchain.pref_cache_size = client_config.blockchain.max_cache_size / 2; - } - None => { - client_config.blockchain.pref_cache_size = self.args.flag_cache_pref_size; - client_config.blockchain.max_cache_size = self.args.flag_cache_max_size; - } - } - client_config.prefer_journal = !self.args.flag_archive; - client_config.name = self.args.flag_identity.clone(); - client_config.queue.max_mem_use = self.args.flag_queue_max_size; - let mut service = ClientService::start(client_config, spec, net_settings, &Path::new(&self.path())).unwrap(); - let client = service.client().clone(); - client.set_author(self.author()); - client.set_extra_data(self.extra_data()); + let mut service = ClientService::start(self.client_config(), spec, net_settings, &Path::new(&self.path())).unwrap(); + let client = service.client(); + + // Miner + let miner = EthMiner::new(client.clone()); + miner.set_author(self.author()); + miner.set_extra_data(self.extra_data()); // Sync - let sync = EthSync::register(service.network(), sync_config, client); + let sync = EthSync::register(service.network(), sync_config, client.clone(), miner.clone()); // Setup rpc if self.args.flag_jsonrpc || self.args.flag_rpc { @@ -382,7 +399,7 @@ impl Configuration { let cors = self.args.flag_rpccorsdomain.as_ref().unwrap_or(&self.args.flag_jsonrpc_cors); // TODO: use this as the API list. let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis); - setup_rpc_server(service.client(), sync.clone(), &url, cors, apis.split(",").collect()); + setup_rpc_server(service.client(), sync.clone(), miner.clone(), &url, cors, apis.split(",").collect()); } // Register IO handler diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index bfdf8f2d3..f6d468f47 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -18,6 +18,7 @@ ethcore-util = { path = "../util" } ethcore = { path = "../ethcore" } ethash = { path = "../ethash" } ethsync = { path = "../sync" } +ethminer = { path = "../miner" } clippy = { version = "0.0.44", optional = true } rustc-serialize = "0.3" transient-hashmap = "0.1" diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 0653a0c33..299084a6d 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -27,6 +27,7 @@ extern crate jsonrpc_http_server; extern crate ethcore_util as util; extern crate ethcore; extern crate ethsync; +extern crate ethminer; extern crate transient_hashmap; use self::jsonrpc_core::{IoHandler, IoDelegate}; diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 7113c55b1..11c6fe8d0 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use std::sync::{Arc, Weak, Mutex, RwLock}; use ethsync::{EthSync, SyncState}; +use ethminer::{EthMiner}; use jsonrpc_core::*; use util::numbers::*; use util::sha3::*; @@ -36,15 +37,17 @@ use v1::helpers::{PollFilter, PollManager}; pub struct EthClient { client: Weak, sync: Weak, + miner: Weak, hashrates: RwLock>, } impl EthClient { /// Creates new EthClient. - pub fn new(client: &Arc, sync: &Arc) -> Self { + pub fn new(client: &Arc, sync: &Arc, miner: &Arc) -> Self { EthClient { client: Arc::downgrade(client), sync: Arc::downgrade(sync), + miner: Arc::downgrade(miner), hashrates: RwLock::new(HashMap::new()), } } @@ -220,8 +223,8 @@ impl Eth for EthClient { fn work(&self, params: Params) -> Result { match params { Params::None => { - let c = take_weak!(self.client); - let u = c.sealing_block().lock().unwrap(); + let miner = take_weak!(self.miner); + let u = miner.sealing_block().lock().unwrap(); match *u { Some(ref b) => { let pow_hash = b.hash(); @@ -239,9 +242,9 @@ impl Eth for EthClient { fn submit_work(&self, params: Params) -> Result { from_params::<(H64, H256, H256)>(params).and_then(|(nonce, pow_hash, mix_hash)| { // trace!("Decoded: nonce={}, pow_hash={}, mix_hash={}", nonce, pow_hash, mix_hash); - let c = take_weak!(self.client); + let miner = take_weak!(self.miner); let seal = vec![encode(&mix_hash).to_vec(), encode(&nonce).to_vec()]; - let r = c.submit_seal(pow_hash, seal); + let r = miner.submit_seal(pow_hash, seal); to_value(&r.is_ok()) }) } diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 0097cd47e..748065deb 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -10,15 +10,14 @@ authors = ["Ethcore , + /// Miner + miner: Arc, } type RlpResponseResult = Result, PacketDecodeError>; impl ChainSync { /// Create a new instance of syncing strategy. - pub fn new(config: SyncConfig) -> ChainSync { + pub fn new(config: SyncConfig, miner: Arc) -> ChainSync { ChainSync { state: SyncState::NotSynced, starting_block: 0, @@ -239,7 +238,7 @@ impl ChainSync { last_sent_block_number: 0, max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, - transaction_queue: Mutex::new(TransactionQueue::new()), + miner: miner, } } @@ -298,7 +297,6 @@ impl ChainSync { self.starting_block = 0; self.highest_block = None; self.have_common_block = false; - self.transaction_queue.lock().unwrap().clear(); self.starting_block = io.chain().chain_info().best_block_number; self.state = SyncState::NotSynced; } @@ -927,16 +925,15 @@ impl ChainSync { } /// Called when peer sends us new transactions fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let chain = io.chain(); let item_count = r.item_count(); trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); - let fetch_latest_nonce = |a : &Address| chain.nonce(a); - let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let mut transactions = Vec::with_capacity(item_count); for i in 0..item_count { let tx: SignedTransaction = try!(r.val_at(i)); - transaction_queue.add(tx, &fetch_latest_nonce); + transactions.push(tx); } + self.miner.import_transactions(transactions); Ok(()) } @@ -1263,38 +1260,9 @@ impl ChainSync { self.check_resume(io); } - /// called when block is imported to chain, updates transactions queue and propagates the blocks - pub fn chain_new_blocks(&mut self, io: &mut SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) { - fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { - let block = chain - .block(BlockId::Hash(hash.clone())) - // Client should send message after commit to db and inserting to chain. - .expect("Expected in-chain blocks."); - let block = BlockView::new(&block); - block.transactions() - } - - - { - let chain = io.chain(); - let good = good.par_iter().map(|h| fetch_transactions(chain, h)); - let bad = bad.par_iter().map(|h| fetch_transactions(chain, h)); - - good.for_each(|txs| { - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); - transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); - }); - bad.for_each(|txs| { - // populate sender - for tx in &txs { - let _sender = tx.sender(); - } - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - transaction_queue.add_all(txs, |a| chain.nonce(a)); - }); - } - + pub fn chain_new_blocks(&mut self, io: &mut SyncIo, good: &[H256], bad: &[H256], retracted: &[H256]) { + // notify miner + self.miner.chain_new_blocks(good, bad, retracted); // Propagate latests blocks self.propagate_latest_blocks(io); // TODO [todr] propagate transactions? diff --git a/sync/src/lib.rs b/sync/src/lib.rs index b5869642c..0d6044135 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -51,27 +51,27 @@ extern crate log; #[macro_use] extern crate ethcore_util as util; extern crate ethcore; +extern crate ethminer; extern crate env_logger; extern crate time; extern crate rand; -extern crate rayon; #[macro_use] extern crate heapsize; use std::ops::*; use std::sync::*; -use ethcore::client::Client; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId}; use util::TimerToken; use util::{U256, ONE_U256}; -use chain::ChainSync; +use ethcore::client::Client; use ethcore::service::SyncMessage; +use ethminer::EthMiner; use io::NetSyncIo; +use chain::ChainSync; mod chain; mod io; mod range_collection; -mod transaction_queue; #[cfg(test)] mod tests; @@ -105,10 +105,10 @@ pub use self::chain::{SyncStatus, SyncState}; impl EthSync { /// Creates and register protocol with the network service - pub fn register(service: &mut NetworkService, config: SyncConfig, chain: Arc) -> Arc { + pub fn register(service: &mut NetworkService, config: SyncConfig, chain: Arc, miner: Arc) -> Arc { let sync = Arc::new(EthSync { chain: chain, - sync: RwLock::new(ChainSync::new(config)), + sync: RwLock::new(ChainSync::new(config, miner)), }); service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler"); sync @@ -154,7 +154,7 @@ impl NetworkProtocolHandler for EthSync { fn message(&self, io: &NetworkContext, message: &SyncMessage) { match *message { - SyncMessage::NewChainBlocks { ref good, ref bad, ref retracted } => { + SyncMessage::NewChainBlocks { ref good, ref bad, ref retracted} => { let mut sync_io = NetSyncIo::new(io, self.chain.deref()); self.sync.write().unwrap().chain_new_blocks(&mut sync_io, good, bad, retracted); },