Merge branch 'master' into signer-wsnotification

Conflicts:
	rpc/src/v1/impls/mod.rs
This commit is contained in:
Tomasz Drwięga 2016-06-02 12:22:52 +02:00
commit 716c9bb43c
47 changed files with 608 additions and 653 deletions

View File

@ -33,7 +33,7 @@ env:
global:
# GH_TOKEN
- secure: bumJASbZSU8bxJ0EyPUJmu16AiV9EXOpyOj86Jlq/Ty9CfwGqsSXt96uDyE+OUJf34RUFQMsw0nk37/zC4lcn6kqk2wpuH3N/o85Zo/cVZY/NusBWLQqtT5VbYWsV+u2Ua4Tmmsw8yVYQhYwU2ZOejNpflL+Cs9XGgORp1L+/gMRMC2y5Se6ZhwnKPQlRJ8LGsG1dzjQULxzADIt3/zuspNBS8a2urJwlHfGMkvHDoUWCviP/GXoSqw3TZR7FmKyxE19I8n9+iSvm9+oZZquvcgfUxMHn8Gq/b44UbPvjtFOg2yam4xdWXF/RyWCHdc/R9EHorSABeCbefIsm+zcUF3/YQxwpSxM4IZEeH2rTiC7dcrsKw3XsO16xFQz5YI5Bay+CT/wTdMmJd7DdYz7Dyf+pOvcM9WOf/zorxYWSBOMYy0uzbusU2iyIghQ82s7E/Ahg+WARtPgkuTLSB5aL1oCTBKHqQscMr7lo5Ti6RpWLxEdTQMBznc+bMr+6dEtkEcG9zqc6cE9XX+ox3wTU6+HVMfQ1ltCntJ4UKcw3A6INEbw9wgocQa812CIASQ2fE+SCAbz6JxBjIAlFUnD1lUB7S8PdMPwn9plfQgKQ2A5YZqg6FnBdf0rQXIJYxQWKHXj/rBHSUCT0tHACDlzTA+EwWggvkP5AGIxRxm8jhw=
- TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p ethminer -p ethjson -p ethcore-dapps -p ethcore-signer"
- TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity -p ethjson -p ethcore-dapps -p ethcore-signer"
- ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}"
- KCOV_FEATURES=""
- KCOV_CMD="./kcov-master/tmp/usr/local/bin/kcov --exclude-pattern /usr/,/.cargo,/root/.multirust,src/tests,util/json-tests,util/src/network/tests,sync/src/tests,ethcore/src/tests,ethcore/src/evm/tests target/kcov"
@ -72,7 +72,6 @@ after_success: |
$KCOV_CMD target/debug/deps/ethcore_rpc-* &&
$KCOV_CMD target/debug/deps/ethcore_dapps-* &&
$KCOV_CMD target/debug/deps/ethcore_signer-* &&
$KCOV_CMD target/debug/deps/ethminer-* &&
$KCOV_CMD target/debug/deps/ethjson-* &&
$KCOV_CMD target/debug/parity-* &&
[ $TRAVIS_BRANCH = master ] &&

25
Cargo.lock generated
View File

@ -17,7 +17,6 @@ dependencies = [
"ethcore-rpc 1.2.0",
"ethcore-signer 1.2.0",
"ethcore-util 1.2.0",
"ethminer 1.2.0",
"ethsync 1.2.0",
"fdlimit 0.1.0",
"hyper 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -255,6 +254,7 @@ dependencies = [
"lazy_static 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rust-crypto 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"syntex 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -334,7 +334,6 @@ dependencies = [
"ethcore-devtools 1.2.0",
"ethcore-util 1.2.0",
"ethjson 0.1.0",
"ethminer 1.2.0",
"ethsync 1.2.0",
"json-ipc-server 0.1.0 (git+https://github.com/ethcore/json-ipc-server.git)",
"jsonrpc-core 2.0.4 (registry+https://github.com/rust-lang/crates.io-index)",
@ -391,7 +390,7 @@ dependencies = [
"rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"sha3 0.1.0",
"slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"slab 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"target_info 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
"tiny-keccak 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
@ -410,20 +409,6 @@ dependencies = [
"syntex 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ethminer"
version = "1.2.0"
dependencies = [
"clippy 0.0.69 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore 1.2.0",
"ethcore-util 1.2.0",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ethsync"
version = "1.2.0"
@ -432,7 +417,6 @@ dependencies = [
"env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore 1.2.0",
"ethcore-util 1.2.0",
"ethminer 1.2.0",
"heapsize 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1192,6 +1176,11 @@ name = "slab"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "slab"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "solicit"
version = "0.4.4"

View File

@ -27,7 +27,6 @@ clippy = { version = "0.0.69", optional = true}
ethcore = { path = "ethcore" }
ethcore-util = { path = "util" }
ethsync = { path = "sync" }
ethminer = { path = "miner" }
ethcore-devtools = { path = "devtools" }
ethcore-rpc = { path = "rpc", optional = true }
ethcore-signer = { path = "signer", optional = true }
@ -46,7 +45,7 @@ default-features = false
default = ["rpc", "dapps", "ethcore-signer"]
rpc = ["ethcore-rpc"]
dapps = ["ethcore-dapps"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev", "ethminer/dev",
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev",
"ethcore-dapps/dev", "ethcore-signer/dev"]
travis-beta = ["ethcore/json-tests"]
travis-nightly = ["ethcore/json-tests", "dev"]

View File

@ -1,4 +1,5 @@
# ethcore
# [Parity](https://ethcore.io/parity.html)
### Fast, light, and robust Ethereum implementation
[![Build Status][travis-image]][travis-url] [![Coverage Status][coveralls-image]][coveralls-url] [![Join the chat at https://gitter.im/trogdoro/xiki][gitter-image]][gitter-url] [![GPLv3][license-image]][license-url]
@ -11,30 +12,64 @@
[license-image]: https://img.shields.io/badge/license-GPL%20v3-green.svg
[license-url]: http://www.gnu.org/licenses/gpl-3.0.en.html
[Documentation](http://ethcore.github.io/parity/ethcore/index.html)
[Internal Documentation](http://ethcore.github.io/parity/ethcore/index.html)
### Building from source
----
First (if you don't already have it) get multirust:
## About Parity
Parity's goal is to be the fastest, lightest, and most secure Ethereum client. We are developing Parity using the sophisticated and
cutting-edge Rust programming language. Parity is licensed under the GPLv3, and can be used for all your Ethereum needs.
By default, Parity will run a JSONRPC server on `127.0.0.1:8545`. This is fully configurable and supports a number
of RPC APIs.
Parity also runs a server for running decentralized apps, or "Dapps", on `http://127.0.0.1:8080`.
This includes a few useful Dapps, including Ethereum Wallet, Maker OTC, and a node status page.
In a near-future release, it will be easy to install Dapps and use them through this web interface.
If you run into an issue while using parity, feel free to file one in this repository
or hop on our [gitter chat room]([gitter-url]) to ask a question. We are glad to help!
Parity's current release is 1.1. You can download it at https://ethcore.io/parity.html or follow the instructions
below to build from source.
----
## Building from source
Parity is fully compatible with Stable Rust.
We recommend installing Rust through [multirust](https://github.com/brson/multirust). If you don't already have multirust, you can install it like this:
- Linux:
```bash
curl -sf https://raw.githubusercontent.com/brson/multirust/master/quick-install.sh | sh
$ curl -sf https://raw.githubusercontent.com/brson/multirust/master/quick-install.sh | sh
```
- OSX with Homebrew:
```bash
brew update && brew install multirust
multirust default stable
$ brew update && brew install multirust
$ multirust default stable
```
Then, download and build Parity:
```bash
# download Parity code
git clone https://github.com/ethcore/parity
cd parity
$ git clone https://github.com/ethcore/parity
$ cd parity
# build in release mode
cargo build --release
$ cargo build --release
```
This will produce an executable in the `target/release` subdirectory.
Either run `cd target/release`, or copy `target/release/parity` to another location.
To get started, just run
```bash
$ parity
```
and parity will begin syncing the Ethereum blockchain.

View File

@ -21,6 +21,8 @@ extern crate rand;
pub mod random_path;
pub mod test_socket;
pub mod stop_guard;
pub use random_path::*;
pub use test_socket::*;
pub use stop_guard::*;

View File

@ -26,7 +26,11 @@ pub struct RandomTempPath {
}
pub fn random_filename() -> String {
(0..8).map(|_| ((random::<f32>() * 26.0) as u8 + 97) as char).collect()
random_str(8)
}
pub fn random_str(len: usize) -> String {
(0..len).map(|_| ((random::<f32>() * 26.0) as u8 + 97) as char).collect()
}
impl RandomTempPath {
@ -54,6 +58,12 @@ impl RandomTempPath {
pub fn as_str(&self) -> &str {
self.path.to_str().unwrap()
}
pub fn new_in(&self, name: &str) -> String {
let mut path = self.path.clone();
path.push(name);
path.to_str().unwrap().to_owned()
}
}
impl Drop for RandomTempPath {

View File

@ -14,12 +14,32 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
extern crate rustc_version;
//! Stop guard mod
use rustc_version::{version_meta, Channel};
use std::sync::Arc;
use std::sync::atomic::*;
fn main() {
if let Channel::Nightly = version_meta().channel {
println!("cargo:rustc-cfg=nightly");
/// Stop guard that will set a stop flag on drop
pub struct StopGuard {
flag: Arc<AtomicBool>,
}
impl StopGuard {
/// Create a stop guard
pub fn new() -> StopGuard {
StopGuard {
flag: Arc::new(AtomicBool::new(false))
}
}
/// Share stop guard between the threads
pub fn share(&self) -> Arc<AtomicBool> {
self.flag.clone()
}
}
impl Drop for StopGuard {
fn drop(&mut self) {
self.flag.store(true, Ordering::Relaxed)
}
}

View File

@ -29,6 +29,7 @@ ethcore-devtools = { path = "../devtools" }
ethjson = { path = "../json" }
bloomchain = "0.1"
"ethcore-ipc" = { path = "../ipc/rpc" }
rayon = "0.3.1"
[features]
jit = ["evmjit"]

View File

@ -37,7 +37,7 @@ use filter::Filter;
use log_entry::LocalizedLogEntry;
use block_queue::{BlockQueue, BlockQueueInfo};
use blockchain::{BlockChain, BlockProvider, TreeRoute, ImportRoute};
use client::{BlockID, TransactionID, UncleID, TraceId, ClientConfig, BlockChainClient, TraceFilter};
use client::{BlockID, TransactionID, UncleID, TraceId, ClientConfig, BlockChainClient, MiningBlockChainClient, TraceFilter};
use client::Error as ClientError;
use env_info::EnvInfo;
use executive::{Executive, Executed, TransactOptions, contract_address};
@ -48,6 +48,7 @@ use trace;
pub use types::blockchain_info::BlockChainInfo;
pub use types::block_status::BlockStatus;
use evm::Factory as EvmFactory;
use miner::{Miner, MinerService, TransactionImportResult, AccountDetails};
impl fmt::Display for BlockChainInfo {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@ -90,6 +91,7 @@ pub struct Client<V = CanonVerifier> where V: Verifier {
panic_handler: Arc<PanicHandler>,
verifier: PhantomData<V>,
vm_factory: Arc<EvmFactory>,
miner: Arc<Miner>,
}
const HISTORY: u64 = 1200;
@ -102,8 +104,8 @@ const CLIENT_DB_VER_STR: &'static str = "5.3";
impl Client<CanonVerifier> {
/// Create a new client with given spec and DB path.
pub fn new(config: ClientConfig, spec: Spec, path: &Path, message_channel: IoChannel<NetSyncMessage> ) -> Result<Arc<Client>, ClientError> {
Client::<CanonVerifier>::new_with_verifier(config, spec, path, message_channel)
pub fn new(config: ClientConfig, spec: Spec, path: &Path, miner: Arc<Miner>, message_channel: IoChannel<NetSyncMessage> ) -> Result<Arc<Client>, ClientError> {
Client::<CanonVerifier>::new_with_verifier(config, spec, path, miner, message_channel)
}
}
@ -126,7 +128,14 @@ pub fn append_path(path: &Path, item: &str) -> String {
impl<V> Client<V> where V: Verifier {
/// Create a new client with given spec and DB path and custom verifier.
pub fn new_with_verifier(config: ClientConfig, spec: Spec, path: &Path, message_channel: IoChannel<NetSyncMessage> ) -> Result<Arc<Client<V>>, ClientError> {
pub fn new_with_verifier(
config: ClientConfig,
spec: Spec,
path: &Path,
miner: Arc<Miner>,
message_channel: IoChannel<NetSyncMessage>)
-> Result<Arc<Client<V>>, ClientError>
{
let path = get_db_path(path, config.pruning, spec.genesis_header().hash());
let gb = spec.genesis_block();
let chain = Arc::new(BlockChain::new(config.blockchain, &gb, &path));
@ -155,6 +164,7 @@ impl<V> Client<V> where V: Verifier {
panic_handler: panic_handler,
verifier: PhantomData,
vm_factory: Arc::new(EvmFactory::new(config.vm_type)),
miner: miner,
};
Ok(Arc::new(client))
@ -328,6 +338,11 @@ impl<V> Client<V> where V: Verifier {
{
if !imported_blocks.is_empty() && self.block_queue.queue_info().is_empty() {
let (enacted, retracted) = self.calculate_enacted_retracted(import_results);
if self.queue_info().is_empty() {
self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted);
}
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
imported: imported_blocks,
invalid: invalid_blocks,
@ -339,7 +354,7 @@ impl<V> Client<V> where V: Verifier {
{
if self.chain_info().best_block_hash != original_best {
io.send(NetworkIoMessage::User(SyncMessage::NewChainHead)).unwrap();
self.miner.update_sealing(self);
}
}
@ -448,81 +463,10 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
Executive::new(&mut state, &env_info, self.engine.deref().deref(), &self.vm_factory).transact(t, options)
}
// TODO [todr] Should be moved to miner crate eventually.
fn try_seal(&self, block: LockedBlock, seal: Vec<Bytes>) -> Result<SealedBlock, LockedBlock> {
block.try_seal(self.engine.deref().deref(), seal)
}
fn vm_factory(&self) -> &EvmFactory {
&self.vm_factory
}
// TODO [todr] Should be moved to miner crate eventually.
fn prepare_sealing(&self, author: Address, gas_floor_target: U256, extra_data: Bytes, transactions: Vec<SignedTransaction>)
-> (Option<ClosedBlock>, HashSet<H256>) {
let engine = self.engine.deref().deref();
let h = self.chain.best_block_hash();
let mut invalid_transactions = HashSet::new();
let mut b = OpenBlock::new(
engine,
&self.vm_factory,
false, // TODO: this will need to be parameterised once we want to do immediate mining insertion.
self.state_db.lock().unwrap().boxed_clone(),
match self.chain.block_header(&h) { Some(ref x) => x, None => { return (None, invalid_transactions) } },
self.build_last_hashes(h.clone()),
author,
gas_floor_target,
extra_data,
);
// Add uncles
self.chain
.find_uncle_headers(&h, engine.maximum_uncle_age())
.unwrap()
.into_iter()
.take(engine.maximum_uncle_count())
.foreach(|h| {
b.push_uncle(h).unwrap();
});
// Add transactions
let block_number = b.block().header().number();
let min_tx_gas = U256::from(self.engine.schedule(&b.env_info()).tx_gas);
for tx in transactions {
// Push transaction to block
let hash = tx.hash();
let import = b.push_transaction(tx, None);
match import {
Err(Error::Execution(ExecutionError::BlockGasLimitReached { gas_limit, gas_used, .. })) => {
trace!(target: "miner", "Skipping adding transaction to block because of gas limit: {:?}", hash);
// Exit early if gas left is smaller then min_tx_gas
if gas_limit - gas_used < min_tx_gas {
break;
}
},
Err(e) => {
invalid_transactions.insert(hash);
trace!(target: "miner",
"Error adding transaction to block: number={}. transaction_hash={:?}, Error: {:?}",
block_number, hash, e);
},
_ => {}
}
}
// And close
let b = b.close();
trace!(target: "miner", "Sealing: number={}, hash={}, diff={}",
b.block().header().number(),
b.hash(),
b.block().header().difficulty()
);
(Some(b), invalid_transactions)
}
fn block_header(&self, id: BlockID) -> Option<Bytes> {
Self::block_hash(&self.chain, id).and_then(|hash| self.chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec()))
}
@ -774,6 +718,89 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
fn last_hashes(&self) -> LastHashes {
self.build_last_hashes(self.chain.best_block_hash())
}
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, Error>> {
let fetch_account = |a: &Address| AccountDetails {
nonce: self.latest_nonce(a),
balance: self.latest_balance(a),
};
self.miner.import_transactions(transactions, fetch_account)
}
fn all_transactions(&self) -> Vec<SignedTransaction> {
self.miner.all_transactions()
}
}
impl<V> MiningBlockChainClient for Client<V> where V: Verifier {
fn prepare_sealing(&self, author: Address, gas_floor_target: U256, extra_data: Bytes, transactions: Vec<SignedTransaction>)
-> (Option<ClosedBlock>, HashSet<H256>) {
let engine = self.engine.deref().deref();
let h = self.chain.best_block_hash();
let mut invalid_transactions = HashSet::new();
let mut b = OpenBlock::new(
engine,
&self.vm_factory,
false, // TODO: this will need to be parameterised once we want to do immediate mining insertion.
self.state_db.lock().unwrap().boxed_clone(),
match self.chain.block_header(&h) { Some(ref x) => x, None => { return (None, invalid_transactions) } },
self.build_last_hashes(h.clone()),
author,
gas_floor_target,
extra_data,
);
// Add uncles
self.chain
.find_uncle_headers(&h, engine.maximum_uncle_age())
.unwrap()
.into_iter()
.take(engine.maximum_uncle_count())
.foreach(|h| {
b.push_uncle(h).unwrap();
});
// Add transactions
let block_number = b.block().header().number();
let min_tx_gas = U256::from(self.engine.schedule(&b.env_info()).tx_gas);
for tx in transactions {
// Push transaction to block
let hash = tx.hash();
let import = b.push_transaction(tx, None);
match import {
Err(Error::Execution(ExecutionError::BlockGasLimitReached { gas_limit, gas_used, .. })) => {
trace!(target: "miner", "Skipping adding transaction to block because of gas limit: {:?}", hash);
// Exit early if gas left is smaller then min_tx_gas
if gas_limit - gas_used < min_tx_gas {
break;
}
},
Err(e) => {
invalid_transactions.insert(hash);
trace!(target: "miner",
"Error adding transaction to block: number={}. transaction_hash={:?}, Error: {:?}",
block_number, hash, e);
},
_ => {}
}
}
// And close
let b = b.close();
trace!(target: "miner", "Sealing: number={}, hash={}, diff={}",
b.block().header().number(),
b.hash(),
b.block().header().difficulty()
);
(Some(b), invalid_transactions)
}
fn try_seal(&self, block: LockedBlock, seal: Vec<Bytes>) -> Result<SealedBlock, LockedBlock> {
block.try_seal(self.engine.deref().deref(), seal)
}
}
impl MayPanic for Client {

View File

@ -46,6 +46,8 @@ use error::{ImportResult, ExecutionError};
use receipt::LocalizedReceipt;
use trace::LocalizedTrace;
use evm::Factory as EvmFactory;
use miner::{TransactionImportResult};
use error::Error as EthError;
/// Blockchain database client. Owns and manages a blockchain and a block queue.
pub trait BlockChainClient : Sync + Send {
@ -154,15 +156,6 @@ pub trait BlockChainClient : Sync + Send {
/// Returns logs matching given filter.
fn logs(&self, filter: Filter) -> Vec<LocalizedLogEntry>;
// TODO [todr] Should be moved to miner crate eventually.
/// Returns ClosedBlock prepared for sealing.
fn prepare_sealing(&self, author: Address, gas_floor_target: U256, extra_data: Bytes, transactions: Vec<SignedTransaction>)
-> (Option<ClosedBlock>, HashSet<H256>);
// TODO [todr] Should be moved to miner crate eventually.
/// Attempts to seal given block. Returns `SealedBlock` on success and the same block in case of error.
fn try_seal(&self, block: LockedBlock, seal: Vec<Bytes>) -> Result<SealedBlock, LockedBlock>;
/// Makes a non-persistent transaction call.
fn call(&self, t: &SignedTransaction) -> Result<Executed, ExecutionError>;
@ -183,5 +176,20 @@ pub trait BlockChainClient : Sync + Send {
/// Get last hashes starting from best block.
fn last_hashes(&self) -> LastHashes;
/// import transactions from network/other 3rd party
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, EthError>>;
/// list all transactions
fn all_transactions(&self) -> Vec<SignedTransaction>;
}
/// Extended client interface used for mining
pub trait MiningBlockChainClient : BlockChainClient {
/// Attempts to seal given block. Returns `SealedBlock` on success and the same block in case of error.
fn try_seal(&self, block: LockedBlock, seal: Vec<Bytes>) -> Result<SealedBlock, LockedBlock>;
/// Returns ClosedBlock prepared for sealing.
fn prepare_sealing(&self, author: Address, gas_floor_target: U256, extra_data: Bytes, transactions: Vec<SignedTransaction>)
-> (Option<ClosedBlock>, HashSet<H256>);
}

View File

@ -20,7 +20,7 @@ use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrder};
use util::*;
use transaction::{Transaction, LocalizedTransaction, SignedTransaction, Action};
use blockchain::TreeRoute;
use client::{BlockChainClient, BlockChainInfo, BlockStatus, BlockID, TransactionID, UncleID, TraceId, TraceFilter, LastHashes};
use client::{BlockChainClient, MiningBlockChainClient, BlockChainInfo, BlockStatus, BlockID, TransactionID, UncleID, TraceId, TraceFilter, LastHashes};
use header::{Header as BlockHeader, BlockNumber};
use filter::Filter;
use log_entry::LocalizedLogEntry;
@ -28,6 +28,7 @@ use receipt::{Receipt, LocalizedReceipt};
use blockchain::extras::BlockReceipts;
use error::{ImportResult};
use evm::Factory as EvmFactory;
use miner::{Miner, MinerService};
use block_queue::BlockQueueInfo;
use block::{SealedBlock, ClosedBlock, LockedBlock};
@ -35,6 +36,9 @@ use executive::Executed;
use error::{ExecutionError};
use trace::LocalizedTrace;
use miner::{TransactionImportResult, AccountDetails};
use error::Error as EthError;
/// Test client.
pub struct TestBlockChainClient {
/// Blocks.
@ -61,6 +65,8 @@ pub struct TestBlockChainClient {
pub receipts: RwLock<HashMap<TransactionID, LocalizedReceipt>>,
/// Block queue size.
pub queue_size: AtomicUsize,
/// Miner
pub miner: Arc<Miner>,
}
#[derive(Clone)]
@ -99,6 +105,7 @@ impl TestBlockChainClient {
execution_result: RwLock::new(None),
receipts: RwLock::new(HashMap::new()),
queue_size: AtomicUsize::new(0),
miner: Arc::new(Miner::default()),
};
client.add_blocks(1, EachBlockWith::Nothing); // add genesis block
client.genesis_hash = client.last_hash.read().unwrap().clone();
@ -232,6 +239,17 @@ impl TestBlockChainClient {
}
}
impl MiningBlockChainClient for TestBlockChainClient {
fn try_seal(&self, block: LockedBlock, _seal: Vec<Bytes>) -> Result<SealedBlock, LockedBlock> {
Err(block)
}
fn prepare_sealing(&self, _author: Address, _gas_floor_target: U256, _extra_data: Bytes, _transactions: Vec<SignedTransaction>) -> (Option<ClosedBlock>, HashSet<H256>) {
(None, HashSet::new())
}
}
impl BlockChainClient for TestBlockChainClient {
fn call(&self, _t: &SignedTransaction) -> Result<Executed, ExecutionError> {
Ok(self.execution_result.read().unwrap().clone().unwrap())
@ -296,14 +314,6 @@ impl BlockChainClient for TestBlockChainClient {
unimplemented!();
}
fn prepare_sealing(&self, _author: Address, _gas_floor_target: U256, _extra_data: Bytes, _transactions: Vec<SignedTransaction>) -> (Option<ClosedBlock>, HashSet<H256>) {
(None, HashSet::new())
}
fn try_seal(&self, block: LockedBlock, _seal: Vec<Bytes>) -> Result<SealedBlock, LockedBlock> {
Err(block)
}
fn block_header(&self, id: BlockID) -> Option<Bytes> {
self.block_hash(id).and_then(|hash| self.blocks.read().unwrap().get(&hash).map(|r| Rlp::new(r).at(0).as_raw().to_vec()))
}
@ -476,4 +486,19 @@ impl BlockChainClient for TestBlockChainClient {
fn block_traces(&self, _trace: BlockID) -> Option<Vec<LocalizedTrace>> {
unimplemented!();
}
fn import_transactions(&self, transactions: Vec<SignedTransaction>) -> Vec<Result<TransactionImportResult, EthError>> {
let nonces = self.nonces.read().unwrap();
let balances = self.balances.read().unwrap();
let fetch_account = |a: &Address| AccountDetails {
nonce: nonces[a],
balance: balances[a],
};
self.miner.import_transactions(transactions, &fetch_account)
}
fn all_transactions(&self) -> Vec<SignedTransaction> {
self.miner.all_transactions()
}
}

View File

@ -22,6 +22,7 @@ use tests::helpers::*;
use devtools::*;
use spec::Genesis;
use ethjson;
use miner::Miner;
pub fn json_chain_test(json_data: &[u8], era: ChainEra) -> Vec<String> {
init_log();
@ -53,7 +54,7 @@ pub fn json_chain_test(json_data: &[u8], era: ChainEra) -> Vec<String> {
let temp = RandomTempPath::new();
{
let client = Client::new(ClientConfig::default(), spec, temp.as_path(), IoChannel::disconnected()).unwrap();
let client = Client::new(ClientConfig::default(), spec, temp.as_path(), Arc::new(Miner::default()), IoChannel::disconnected()).unwrap();
for b in &blockchain.blocks_rlp() {
if Block::is_good(&b) {
let _ = client.import_block(b.clone());

View File

@ -90,6 +90,7 @@ extern crate crossbeam;
extern crate ethjson;
extern crate bloomchain;
#[macro_use] extern crate ethcore_ipc as ipc;
extern crate rayon;
#[cfg(test)] extern crate ethcore_devtools as devtools;
#[cfg(feature = "jit" )] extern crate evmjit;
@ -109,6 +110,7 @@ pub mod views;
pub mod pod_state;
pub mod engine;
pub mod migrations;
pub mod miner;
mod blooms;
mod db;

View File

@ -18,17 +18,17 @@ use rayon::prelude::*;
use std::sync::atomic::AtomicBool;
use util::*;
use util::keys::store::AccountProvider;
use ethcore::views::{BlockView, HeaderView};
use ethcore::client::{BlockChainClient, BlockID};
use ethcore::block::{ClosedBlock, IsBlock};
use ethcore::error::*;
use ethcore::client::{Executive, Executed, EnvInfo, TransactOptions};
use ethcore::transaction::SignedTransaction;
use ethcore::receipt::Receipt;
use ethcore::spec::Spec;
use ethcore::engine::Engine;
use super::{MinerService, MinerStatus, TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin};
use util::keys::store::{AccountProvider};
use views::{BlockView, HeaderView};
use client::{MiningBlockChainClient, BlockID};
use block::{ClosedBlock, IsBlock};
use error::*;
use client::{Executive, Executed, EnvInfo, TransactOptions};
use transaction::SignedTransaction;
use receipt::{Receipt};
use spec::Spec;
use engine::Engine;
use miner::{MinerService, MinerStatus, TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin};
/// Keeps track of transactions using priority queue and holds currently mined block.
pub struct Miner {
@ -104,7 +104,7 @@ impl Miner {
/// Prepares new block for sealing including top transactions from queue.
#[cfg_attr(feature="dev", allow(match_same_arms))]
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
fn prepare_sealing(&self, chain: &BlockChainClient) {
fn prepare_sealing(&self, chain: &MiningBlockChainClient) {
trace!(target: "miner", "prepare_sealing: entering");
let transactions = self.transaction_queue.lock().unwrap().top_transactions();
let mut sealing_work = self.sealing_work.lock().unwrap();
@ -205,14 +205,14 @@ impl Miner {
trace!(target: "miner", "prepare_sealing: leaving (last={:?})", sealing_work.peek_last_ref().map(|b| b.block().fields().header.hash()));
}
fn update_gas_limit(&self, chain: &BlockChainClient) {
fn update_gas_limit(&self, chain: &MiningBlockChainClient) {
let gas_limit = HeaderView::new(&chain.best_block_header()).gas_limit();
let mut queue = self.transaction_queue.lock().unwrap();
queue.set_gas_limit(gas_limit);
}
/// Returns true if we had to prepare new pending block
fn enable_and_prepare_sealing(&self, chain: &BlockChainClient) -> bool {
fn enable_and_prepare_sealing(&self, chain: &MiningBlockChainClient) -> bool {
trace!(target: "miner", "enable_and_prepare_sealing: entering");
let have_work = self.sealing_work.lock().unwrap().peek_last_ref().is_some();
trace!(target: "miner", "enable_and_prepare_sealing: have_work={}", have_work);
@ -236,7 +236,7 @@ const SEALING_TIMEOUT_IN_BLOCKS : u64 = 5;
impl MinerService for Miner {
fn clear_and_reset(&self, chain: &BlockChainClient) {
fn clear_and_reset(&self, chain: &MiningBlockChainClient) {
self.transaction_queue.lock().unwrap().clear();
self.update_sealing(chain);
}
@ -251,7 +251,7 @@ impl MinerService for Miner {
}
}
fn call(&self, chain: &BlockChainClient, t: &SignedTransaction) -> Result<Executed, ExecutionError> {
fn call(&self, chain: &MiningBlockChainClient, t: &SignedTransaction) -> Result<Executed, ExecutionError> {
let sealing_work = self.sealing_work.lock().unwrap();
match sealing_work.peek_last_ref() {
Some(work) => {
@ -287,7 +287,7 @@ impl MinerService for Miner {
}
}
fn balance(&self, chain: &BlockChainClient, address: &Address) -> U256 {
fn balance(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 {
let sealing_work = self.sealing_work.lock().unwrap();
sealing_work.peek_last_ref().map_or_else(
|| chain.latest_balance(address),
@ -295,7 +295,7 @@ impl MinerService for Miner {
)
}
fn storage_at(&self, chain: &BlockChainClient, address: &Address, position: &H256) -> H256 {
fn storage_at(&self, chain: &MiningBlockChainClient, address: &Address, position: &H256) -> H256 {
let sealing_work = self.sealing_work.lock().unwrap();
sealing_work.peek_last_ref().map_or_else(
|| chain.latest_storage_at(address, position),
@ -303,12 +303,12 @@ impl MinerService for Miner {
)
}
fn nonce(&self, chain: &BlockChainClient, address: &Address) -> U256 {
fn nonce(&self, chain: &MiningBlockChainClient, address: &Address) -> U256 {
let sealing_work = self.sealing_work.lock().unwrap();
sealing_work.peek_last_ref().map_or_else(|| chain.latest_nonce(address), |b| b.block().fields().state.nonce(address))
}
fn code(&self, chain: &BlockChainClient, address: &Address) -> Option<Bytes> {
fn code(&self, chain: &MiningBlockChainClient, address: &Address) -> Option<Bytes> {
let sealing_work = self.sealing_work.lock().unwrap();
sealing_work.peek_last_ref().map_or_else(|| chain.code(address), |b| b.block().fields().state.code(address))
}
@ -375,7 +375,7 @@ impl MinerService for Miner {
.collect()
}
fn import_own_transaction<T>(&self, chain: &BlockChainClient, transaction: SignedTransaction, fetch_account: T) ->
fn import_own_transaction<T>(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction, fetch_account: T) ->
Result<TransactionImportResult, Error>
where T: Fn(&Address) -> AccountDetails {
let hash = transaction.hash();
@ -469,7 +469,7 @@ impl MinerService for Miner {
self.transaction_queue.lock().unwrap().last_nonce(address)
}
fn update_sealing(&self, chain: &BlockChainClient) {
fn update_sealing(&self, chain: &MiningBlockChainClient) {
if self.sealing_enabled.load(atomic::Ordering::Relaxed) {
let current_no = chain.chain_info().best_block_number;
let has_local_transactions = self.transaction_queue.lock().unwrap().has_local_pending_transactions();
@ -489,7 +489,7 @@ impl MinerService for Miner {
}
}
fn map_sealing_work<F, T>(&self, chain: &BlockChainClient, f: F) -> Option<T> where F: FnOnce(&ClosedBlock) -> T {
fn map_sealing_work<F, T>(&self, chain: &MiningBlockChainClient, f: F) -> Option<T> where F: FnOnce(&ClosedBlock) -> T {
trace!(target: "miner", "map_sealing_work: entering");
self.enable_and_prepare_sealing(chain);
trace!(target: "miner", "map_sealing_work: sealing prepared");
@ -499,7 +499,7 @@ impl MinerService for Miner {
ret.map(f)
}
fn submit_seal(&self, chain: &BlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {
fn submit_seal(&self, chain: &MiningBlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error> {
if let Some(b) = self.sealing_work.lock().unwrap().take_used_if(|b| &b.hash() == &pow_hash) {
match chain.try_seal(b.lock(), seal) {
Err(_) => {
@ -522,8 +522,8 @@ impl MinerService for Miner {
}
}
fn chain_new_blocks(&self, chain: &BlockChainClient, _imported: &[H256], _invalid: &[H256], enacted: &[H256], retracted: &[H256]) {
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
fn chain_new_blocks(&self, chain: &MiningBlockChainClient, _imported: &[H256], _invalid: &[H256], enacted: &[H256], retracted: &[H256]) {
fn fetch_transactions(chain: &MiningBlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
let block = chain
.block(BlockID::Hash(*hash))
// Client should send message after commit to db and inserting to chain.
@ -584,13 +584,13 @@ impl MinerService for Miner {
#[cfg(test)]
mod tests {
use MinerService;
use super::{Miner};
use super::super::MinerService;
use super::Miner;
use util::*;
use ethcore::client::{TestBlockChainClient, EachBlockWith};
use ethcore::block::*;
use client::{TestBlockChainClient, EachBlockWith};
use block::*;
// TODO [ToDr] To uncomment when TestBlockChainClient can actually return a ClosedBlock.
// TODO [ToDr] To uncomment` when TestBlockChainClient can actually return a ClosedBlock.
#[ignore]
#[test]
fn should_prepare_block_to_seal() {

View File

@ -26,12 +26,10 @@
//! ```rust
//! extern crate ethcore_util as util;
//! extern crate ethcore;
//! extern crate ethminer;
//! use std::env;
//! use util::network::{NetworkService, NetworkConfiguration};
//! use ethcore::client::{Client, ClientConfig};
//! use ethcore::ethereum;
//! use ethminer::{Miner, MinerService};
//! use ethcore::miner::{Miner, MinerService};
//!
//! fn main() {
//! let miner: Miner = Miner::default();
@ -43,30 +41,21 @@
//! }
//! ```
#[macro_use]
extern crate log;
#[macro_use]
extern crate ethcore_util as util;
extern crate ethcore;
extern crate env_logger;
extern crate rayon;
mod miner;
mod external;
mod transaction_queue;
pub use transaction_queue::{TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin};
pub use miner::{Miner};
pub use external::{ExternalMiner, ExternalMinerService};
pub use self::transaction_queue::{TransactionQueue, AccountDetails, TransactionImportResult, TransactionOrigin};
pub use self::miner::{Miner};
pub use self::external::{ExternalMiner, ExternalMinerService};
use std::collections::BTreeMap;
use util::{H256, U256, Address, Bytes};
use ethcore::client::{BlockChainClient, Executed};
use ethcore::block::ClosedBlock;
use ethcore::receipt::Receipt;
use ethcore::error::{Error, ExecutionError};
use ethcore::transaction::SignedTransaction;
use client::{MiningBlockChainClient, Executed};
use block::ClosedBlock;
use receipt::Receipt;
use error::{Error, ExecutionError};
use transaction::SignedTransaction;
/// Miner client API
pub trait MinerService : Send + Sync {
@ -110,7 +99,7 @@ pub trait MinerService : Send + Sync {
where T: Fn(&Address) -> AccountDetails, Self: Sized;
/// Imports own (node owner) transaction to queue.
fn import_own_transaction<T>(&self, chain: &BlockChainClient, transaction: SignedTransaction, fetch_account: T) ->
fn import_own_transaction<T>(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction, fetch_account: T) ->
Result<TransactionImportResult, Error>
where T: Fn(&Address) -> AccountDetails, Self: Sized;
@ -118,20 +107,20 @@ pub trait MinerService : Send + Sync {
fn pending_transactions_hashes(&self) -> Vec<H256>;
/// Removes all transactions from the queue and restart mining operation.
fn clear_and_reset(&self, chain: &BlockChainClient);
fn clear_and_reset(&self, chain: &MiningBlockChainClient);
/// Called when blocks are imported to chain, updates transactions queue.
fn chain_new_blocks(&self, chain: &BlockChainClient, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]);
fn chain_new_blocks(&self, chain: &MiningBlockChainClient, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]);
/// New chain head event. Restart mining operation.
fn update_sealing(&self, chain: &BlockChainClient);
fn update_sealing(&self, chain: &MiningBlockChainClient);
/// Submit `seal` as a valid solution for the header of `pow_hash`.
/// Will check the seal, but not actually insert the block into the chain.
fn submit_seal(&self, chain: &BlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error>;
fn submit_seal(&self, chain: &MiningBlockChainClient, pow_hash: H256, seal: Vec<Bytes>) -> Result<(), Error>;
/// Get the sealing work package and if `Some`, apply some transform.
fn map_sealing_work<F, T>(&self, chain: &BlockChainClient, f: F) -> Option<T>
fn map_sealing_work<F, T>(&self, chain: &MiningBlockChainClient, f: F) -> Option<T>
where F: FnOnce(&ClosedBlock) -> T, Self: Sized;
/// Query pending transactions for hash.
@ -156,19 +145,19 @@ pub trait MinerService : Send + Sync {
fn sensible_gas_limit(&self) -> U256 { 21000.into() }
/// Latest account balance in pending state.
fn balance(&self, chain: &BlockChainClient, address: &Address) -> U256;
fn balance(&self, chain: &MiningBlockChainClient, address: &Address) -> U256;
/// Call into contract code using pending state.
fn call(&self, chain: &BlockChainClient, t: &SignedTransaction) -> Result<Executed, ExecutionError>;
fn call(&self, chain: &MiningBlockChainClient, t: &SignedTransaction) -> Result<Executed, ExecutionError>;
/// Get storage value in pending state.
fn storage_at(&self, chain: &BlockChainClient, address: &Address, position: &H256) -> H256;
fn storage_at(&self, chain: &MiningBlockChainClient, address: &Address, position: &H256) -> H256;
/// Get account nonce in pending state.
fn nonce(&self, chain: &BlockChainClient, address: &Address) -> U256;
fn nonce(&self, chain: &MiningBlockChainClient, address: &Address) -> U256;
/// Get contract code in pending state.
fn code(&self, chain: &BlockChainClient, address: &Address) -> Option<Bytes>;
fn code(&self, chain: &MiningBlockChainClient, address: &Address) -> Option<Bytes>;
}
/// Mining status

View File

@ -26,13 +26,12 @@
//! ```rust
//! extern crate ethcore_util as util;
//! extern crate ethcore;
//! extern crate ethminer;
//! extern crate rustc_serialize;
//!
//! use util::crypto::KeyPair;
//! use util::hash::Address;
//! use util::numbers::{Uint, U256};
//! use ethminer::{TransactionQueue, AccountDetails, TransactionOrigin};
//! use ethcore::miner::{TransactionQueue, AccountDetails, TransactionOrigin};
//! use ethcore::transaction::*;
//! use rustc_serialize::hex::FromHex;
//!
@ -89,8 +88,8 @@ use std::collections::{HashMap, BTreeSet};
use util::numbers::{Uint, U256};
use util::hash::{Address, H256};
use util::table::*;
use ethcore::transaction::*;
use ethcore::error::{Error, TransactionError};
use transaction::*;
use error::{Error, TransactionError};
/// Transaction origin
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@ -778,8 +777,8 @@ mod test {
extern crate rustc_serialize;
use util::table::*;
use util::*;
use ethcore::transaction::*;
use ethcore::error::{Error, TransactionError};
use transaction::*;
use error::{Error, TransactionError};
use super::*;
use super::{TransactionSet, TransactionOrder, VerifiedTransaction};

View File

@ -21,6 +21,7 @@ use util::panics::*;
use spec::Spec;
use error::*;
use client::{Client, ClientConfig};
use miner::Miner;
/// Message type for external and internal events
#[derive(Clone)]
@ -54,14 +55,14 @@ pub struct ClientService {
impl ClientService {
/// Start the service in a separate thread.
pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path) -> Result<ClientService, Error> {
pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path, miner: Arc<Miner>) -> Result<ClientService, Error> {
let panic_handler = PanicHandler::new_in_arc();
let mut net_service = try!(NetworkService::start(net_config));
panic_handler.forward_from(&net_service);
info!("Starting {}", net_service.host_info());
info!("Configured for {} using {:?} engine", spec.name, spec.engine.name());
let client = try!(Client::new(config, spec, db_path, net_service.io().channel()));
let client = try!(Client::new(config, spec, db_path, miner, net_service.io().channel()));
panic_handler.forward_from(client.deref());
let client_io = Arc::new(ClientIoHandler {
client: client.clone()
@ -141,12 +142,14 @@ mod tests {
use util::network::*;
use devtools::*;
use client::ClientConfig;
use std::sync::Arc;
use miner::Miner;
#[test]
fn it_can_be_started() {
let spec = get_test_spec();
let temp_path = RandomTempPath::new();
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path());
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default()));
assert!(service.is_ok());
}
}

View File

@ -14,16 +14,17 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use client::{BlockChainClient, Client, ClientConfig, BlockID};
use client::{BlockChainClient, MiningBlockChainClient, Client, ClientConfig, BlockID};
use block::IsBlock;
use tests::helpers::*;
use common::*;
use devtools::*;
use miner::Miner;
#[test]
fn imports_from_empty() {
let dir = RandomTempPath::new();
let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()).unwrap();
let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), Arc::new(Miner::default()), IoChannel::disconnected()).unwrap();
client.import_verified_blocks(&IoChannel::disconnected());
client.flush_queue();
}
@ -41,7 +42,7 @@ fn returns_state_root_basic() {
#[test]
fn imports_good_block() {
let dir = RandomTempPath::new();
let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()).unwrap();
let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), Arc::new(Miner::default()), IoChannel::disconnected()).unwrap();
let good_block = get_good_dummy_block();
if let Err(_) = client.import_block(good_block) {
panic!("error importing block being good by definition");
@ -56,7 +57,7 @@ fn imports_good_block() {
#[test]
fn query_none_block() {
let dir = RandomTempPath::new();
let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()).unwrap();
let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), Arc::new(Miner::default()), IoChannel::disconnected()).unwrap();
let non_existant = client.block_header(BlockID::Number(188));
assert!(non_existant.is_none());

View File

@ -23,6 +23,7 @@ use evm::Schedule;
use engine::*;
use ethereum;
use devtools::*;
use miner::Miner;
#[cfg(feature = "json-tests")]
pub enum ChainEra {
@ -139,7 +140,7 @@ pub fn create_test_block_with_data(header: &Header, transactions: &[&SignedTrans
pub fn generate_dummy_client(block_number: u32) -> GuardedTempResult<Arc<Client>> {
let dir = RandomTempPath::new();
let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()).unwrap();
let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), Arc::new(Miner::default()), IoChannel::disconnected()).unwrap();
let test_spec = get_test_spec();
let test_engine = &test_spec.engine;
let state_root = test_spec.genesis_header().state_root;
@ -205,7 +206,7 @@ pub fn push_blocks_to_client(client: &Arc<Client>, timestamp_salt: u64, starting
pub fn get_test_client_with_blocks(blocks: Vec<Bytes>) -> GuardedTempResult<Arc<Client>> {
let dir = RandomTempPath::new();
let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()).unwrap();
let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), Arc::new(Miner::default()), IoChannel::disconnected()).unwrap();
for block in &blocks {
if let Err(_) = client.import_block(block.clone()) {
panic!("panic importing block which is well-formed");

View File

@ -1,24 +0,0 @@
[package]
description = "Ethminer library"
homepage = "http://ethcore.io"
license = "GPL-3.0"
name = "ethminer"
version = "1.2.0"
authors = ["Ethcore <admin@ethcore.io>"]
build = "build.rs"
[build-dependencies]
rustc_version = "0.1"
[dependencies]
ethcore-util = { path = "../util" }
ethcore = { path = "../ethcore" }
log = "0.3"
env_logger = "0.3"
rustc-serialize = "0.3"
rayon = "0.3.1"
clippy = { version = "0.0.69", optional = true}
[features]
default = []
dev = ["clippy"]

View File

@ -27,7 +27,6 @@ extern crate rustc_serialize;
extern crate ethcore_util as util;
extern crate ethcore;
extern crate ethsync;
extern crate ethminer;
#[macro_use]
extern crate log as rlog;
extern crate env_logger;
@ -86,7 +85,7 @@ use ethcore::error::{Error, ImportError};
use ethcore::service::ClientService;
use ethcore::spec::Spec;
use ethsync::EthSync;
use ethminer::{Miner, MinerService, ExternalMiner};
use ethcore::miner::{Miner, MinerService, ExternalMiner};
use daemonize::Daemonize;
use migration::migrate;
use informant::Informant;
@ -174,14 +173,6 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
// Secret Store
let account_service = Arc::new(conf.account_service());
// Build client
let mut service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path())
).unwrap_or_else(|e| die_with_error("Client", e));
panic_handler.forward_from(&service);
let client = service.client();
// Miner
let miner = Miner::with_accounts(conf.args.flag_force_sealing, conf.spec(), account_service.clone());
miner.set_author(conf.author());
@ -190,11 +181,19 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
miner.set_minimal_gas_price(conf.gas_price());
miner.set_transactions_limit(conf.args.flag_tx_limit);
// Build client
let mut service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path()), miner.clone()
).unwrap_or_else(|e| die_with_error("Client", e));
panic_handler.forward_from(&service);
let client = service.client();
let external_miner = Arc::new(ExternalMiner::default());
let network_settings = Arc::new(conf.network_settings());
// Sync
let sync = EthSync::register(service.network(), sync_config, client.clone(), miner.clone());
let sync = EthSync::register(service.network(), sync_config, client.clone());
let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies {
signer_enabled: conf.args.flag_signer,
@ -292,7 +291,7 @@ fn execute_export(conf: Configuration) {
// Build client
let service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path())
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()),
).unwrap_or_else(|e| die_with_error("Client", e));
panic_handler.forward_from(&service);
@ -363,7 +362,7 @@ fn execute_import(conf: Configuration) {
// Build client
let service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path())
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()),
).unwrap_or_else(|e| die_with_error("Client", e));
panic_handler.forward_from(&service);

View File

@ -20,7 +20,7 @@ use std::sync::Arc;
use die::*;
use ethsync::EthSync;
use ethminer::{Miner, ExternalMiner};
use ethcore::miner::{Miner, ExternalMiner};
use ethcore::client::Client;
use util::RotatingLogger;
use util::keys::store::AccountService;

View File

@ -18,7 +18,6 @@ ethcore-util = { path = "../util" }
ethcore = { path = "../ethcore" }
ethash = { path = "../ethash" }
ethsync = { path = "../sync" }
ethminer = { path = "../miner" }
ethjson = { path = "../json" }
ethcore-devtools = { path = "../devtools" }
rustc-serialize = "0.3"
@ -34,4 +33,4 @@ syntex = "^0.32.0"
[features]
default = ["serde_codegen"]
nightly = ["serde_macros"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethminer/dev"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev"]

View File

@ -29,7 +29,6 @@ extern crate jsonrpc_http_server;
extern crate ethcore_util as util;
extern crate ethcore;
extern crate ethsync;
extern crate ethminer;
extern crate transient_hashmap;
extern crate json_ipc_server as ipc;

View File

@ -21,13 +21,13 @@ extern crate ethash;
use std::sync::{Arc, Weak, Mutex};
use std::ops::Deref;
use ethsync::{SyncProvider, SyncState};
use ethminer::{MinerService, ExternalMinerService};
use ethcore::miner::{MinerService, ExternalMinerService};
use jsonrpc_core::*;
use util::numbers::*;
use util::sha3::*;
use util::rlp::{encode, decode, UntrustedRlp, View};
use util::keys::store::AccountProvider;
use ethcore::client::{BlockChainClient, BlockID, TransactionID, UncleID};
use ethcore::client::{MiningBlockChainClient, BlockID, TransactionID, UncleID};
use ethcore::block::IsBlock;
use ethcore::views::*;
use ethcore::ethereum::Ethash;
@ -42,7 +42,7 @@ use serde;
/// Eth rpc implementation.
pub struct EthClient<C, S, A, M, EM> where
C: BlockChainClient,
C: MiningBlockChainClient,
S: SyncProvider,
A: AccountProvider,
M: MinerService,
@ -57,7 +57,7 @@ pub struct EthClient<C, S, A, M, EM> where
}
impl<C, S, A, M, EM> EthClient<C, S, A, M, EM> where
C: BlockChainClient,
C: MiningBlockChainClient,
S: SyncProvider,
A: AccountProvider,
M: MinerService,
@ -222,7 +222,7 @@ fn make_unsupported_err() -> Error {
}
impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM> where
C: BlockChainClient + 'static,
C: MiningBlockChainClient + 'static,
S: SyncProvider + 'static,
A: AccountProvider + 'static,
M: MinerService + 'static,

View File

@ -21,7 +21,7 @@ use std::sync::{Arc, Weak, Mutex};
use std::collections::HashSet;
use jsonrpc_core::*;
use util::numbers::*;
use ethminer::MinerService;
use ethcore::miner::MinerService;
use ethcore::filter::Filter as EthcoreFilter;
use ethcore::client::{BlockChainClient, BlockID};
use v1::traits::EthFilter;

View File

@ -18,8 +18,8 @@
use std::sync::{Arc, Weak};
use jsonrpc_core::*;
use ethminer::MinerService;
use ethcore::client::BlockChainClient;
use ethcore::miner::MinerService;
use ethcore::client::MiningBlockChainClient;
use util::numbers::*;
use util::keys::store::AccountProvider;
use v1::helpers::{SigningQueue, ConfirmationsQueue};
@ -62,7 +62,7 @@ impl EthSigning for EthSigningQueueClient {
/// Implementation of functions that require signing when no trusted signer is used.
pub struct EthSigningUnsafeClient<C, A, M> where
C: BlockChainClient,
C: MiningBlockChainClient,
A: AccountProvider,
M: MinerService {
client: Weak<C>,
@ -71,7 +71,7 @@ pub struct EthSigningUnsafeClient<C, A, M> where
}
impl<C, A, M> EthSigningUnsafeClient<C, A, M> where
C: BlockChainClient,
C: MiningBlockChainClient,
A: AccountProvider,
M: MinerService {
@ -87,7 +87,7 @@ impl<C, A, M> EthSigningUnsafeClient<C, A, M> where
}
impl<C, A, M> EthSigning for EthSigningUnsafeClient<C, A, M> where
C: BlockChainClient + 'static,
C: MiningBlockChainClient + 'static,
A: AccountProvider + 'static,
M: MinerService + 'static {

View File

@ -22,7 +22,7 @@ use std::sync::{Arc, Weak};
use std::ops::Deref;
use std::collections::BTreeMap;
use jsonrpc_core::*;
use ethminer::MinerService;
use ethcore::miner::MinerService;
use v1::traits::Ethcore;
use v1::types::Bytes;

View File

@ -52,15 +52,15 @@ pub use self::traces::TracesClient;
pub use self::rpc::RpcClient;
use v1::types::TransactionRequest;
use ethminer::{AccountDetails, MinerService};
use ethcore::client::BlockChainClient;
use ethcore::miner::{AccountDetails, MinerService};
use ethcore::client::MiningBlockChainClient;
use ethcore::transaction::{Action, SignedTransaction, Transaction};
use util::numbers::*;
use util::rlp::encode;
use util::bytes::ToPretty;
fn dispatch_transaction<C, M>(client: &C, miner: &M, signed_transaction: SignedTransaction) -> H256
where C: BlockChainClient, M: MinerService {
where C: MiningBlockChainClient, M: MinerService {
let hash = signed_transaction.hash();
let import = miner.import_own_transaction(client, signed_transaction, |a: &Address| {
@ -74,7 +74,7 @@ fn dispatch_transaction<C, M>(client: &C, miner: &M, signed_transaction: SignedT
}
fn sign_and_dispatch<C, M>(client: &C, miner: &M, request: TransactionRequest, secret: H256) -> H256
where C: BlockChainClient, M: MinerService {
where C: MiningBlockChainClient, M: MinerService {
let signed_transaction = {
Transaction {

View File

@ -22,19 +22,19 @@ use v1::types::TransactionRequest;
use v1::impls::sign_and_dispatch;
use util::keys::store::AccountProvider;
use util::numbers::*;
use ethcore::client::BlockChainClient;
use ethminer::MinerService;
use ethcore::client::MiningBlockChainClient;
use ethcore::miner::MinerService;
/// Account management (personal) rpc implementation.
pub struct PersonalClient<A, C, M>
where A: AccountProvider, C: BlockChainClient, M: MinerService {
where A: AccountProvider, C: MiningBlockChainClient, M: MinerService {
accounts: Weak<A>,
client: Weak<C>,
miner: Weak<M>,
}
impl<A, C, M> PersonalClient<A, C, M>
where A: AccountProvider, C: BlockChainClient, M: MinerService {
where A: AccountProvider, C: MiningBlockChainClient, M: MinerService {
/// Creates new PersonalClient
pub fn new(store: &Arc<A>, client: &Arc<C>, miner: &Arc<M>) -> Self {
PersonalClient {
@ -46,7 +46,7 @@ impl<A, C, M> PersonalClient<A, C, M>
}
impl<A: 'static, C: 'static, M: 'static> Personal for PersonalClient<A, C, M>
where A: AccountProvider, C: BlockChainClient, M: MinerService {
where A: AccountProvider, C: MiningBlockChainClient, M: MinerService {
fn accounts(&self, _: Params) -> Result<Value, Error> {
let store = take_weak!(self.accounts);
match store.accounts() {

View File

@ -24,12 +24,12 @@ use v1::impls::sign_and_dispatch;
use v1::helpers::{SigningQueue, ConfirmationsQueue};
use util::keys::store::AccountProvider;
use util::numbers::*;
use ethcore::client::BlockChainClient;
use ethminer::MinerService;
use ethcore::client::MiningBlockChainClient;
use ethcore::miner::MinerService;
/// Transactions confirmation (personal) rpc implementation.
pub struct SignerClient<A, C, M>
where A: AccountProvider, C: BlockChainClient, M: MinerService {
where A: AccountProvider, C: MiningBlockChainClient, M: MinerService {
queue: Weak<ConfirmationsQueue>,
accounts: Weak<A>,
client: Weak<C>,
@ -37,7 +37,7 @@ pub struct SignerClient<A, C, M>
}
impl<A: 'static, C: 'static, M: 'static> SignerClient<A, C, M>
where A: AccountProvider, C: BlockChainClient, M: MinerService {
where A: AccountProvider, C: MiningBlockChainClient, M: MinerService {
/// Create new instance of signer client.
pub fn new(store: &Arc<A>, client: &Arc<C>, miner: &Arc<M>, queue: &Arc<ConfirmationsQueue>) -> Self {
@ -51,7 +51,7 @@ impl<A: 'static, C: 'static, M: 'static> SignerClient<A, C, M>
}
impl<A: 'static, C: 'static, M: 'static> PersonalSigner for SignerClient<A, C, M>
where A: AccountProvider, C: BlockChainClient, M: MinerService {
where A: AccountProvider, C: MiningBlockChainClient, M: MinerService {
fn transactions_to_confirm(&self, _params: Params) -> Result<Value, Error> {
let queue = take_weak!(self.queue);

View File

@ -19,13 +19,14 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::str::FromStr;
use ethcore::client::{MiningBlockChainClient, BlockChainClient, Client, ClientConfig};
use ethcore::ids::BlockID;
use ethcore::client::{Client, BlockChainClient, ClientConfig};
use ethcore::spec::{Genesis, Spec};
use ethcore::block::Block;
use ethcore::views::BlockView;
use ethcore::ethereum;
use ethminer::{Miner, MinerService, ExternalMiner};
use ethcore::transaction::{Transaction, Action};
use ethcore::miner::{MinerService, ExternalMiner, Miner};
use devtools::RandomTempPath;
use util::Hashable;
use util::io::IoChannel;
@ -68,8 +69,8 @@ fn make_spec(chain: &BlockChain) -> Spec {
}
struct EthTester {
_miner: Arc<MinerService>,
client: Arc<Client>,
_miner: Arc<MinerService>,
accounts: Arc<TestAccountProvider>,
handler: IoHandler,
}
@ -96,10 +97,10 @@ impl EthTester {
where F: Fn() -> Spec {
let dir = RandomTempPath::new();
let client = Client::new(ClientConfig::default(), spec_provider(), dir.as_path(), IoChannel::disconnected()).unwrap();
let sync_provider = sync_provider();
let account_provider = account_provider();
let miner_service = miner_service(spec_provider(), account_provider.clone());
let client = Client::new(ClientConfig::default(), spec_provider(), dir.as_path(), miner_service.clone(), IoChannel::disconnected()).unwrap();
let sync_provider = sync_provider();
let external_miner = Arc::new(ExternalMiner::default());
let eth_client = EthClient::new(

View File

@ -19,11 +19,11 @@
use util::{Address, H256, Bytes, U256, FixedHash, Uint};
use util::standard::*;
use ethcore::error::{Error, ExecutionError};
use ethcore::client::{BlockChainClient, Executed};
use ethcore::client::{MiningBlockChainClient, Executed};
use ethcore::block::{ClosedBlock, IsBlock};
use ethcore::transaction::SignedTransaction;
use ethcore::receipt::Receipt;
use ethminer::{MinerService, MinerStatus, AccountDetails, TransactionImportResult};
use ethcore::miner::{MinerService, MinerStatus, AccountDetails, TransactionImportResult};
/// Test miner service.
pub struct TestMinerService {
@ -132,7 +132,7 @@ impl MinerService for TestMinerService {
}
/// Imports transactions to transaction queue.
fn import_own_transaction<T>(&self, chain: &BlockChainClient, transaction: SignedTransaction, _fetch_account: T) ->
fn import_own_transaction<T>(&self, chain: &MiningBlockChainClient, transaction: SignedTransaction, _fetch_account: T) ->
Result<TransactionImportResult, Error>
where T: Fn(&Address) -> AccountDetails {
@ -154,21 +154,21 @@ impl MinerService for TestMinerService {
}
/// Removes all transactions from the queue and restart mining operation.
fn clear_and_reset(&self, _chain: &BlockChainClient) {
fn clear_and_reset(&self, _chain: &MiningBlockChainClient) {
unimplemented!();
}
/// Called when blocks are imported to chain, updates transactions queue.
fn chain_new_blocks(&self, _chain: &BlockChainClient, _imported: &[H256], _invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) {
fn chain_new_blocks(&self, _chain: &MiningBlockChainClient, _imported: &[H256], _invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) {
unimplemented!();
}
/// New chain head event. Restart mining operation.
fn update_sealing(&self, _chain: &BlockChainClient) {
fn update_sealing(&self, _chain: &MiningBlockChainClient) {
unimplemented!();
}
fn map_sealing_work<F, T>(&self, _chain: &BlockChainClient, _f: F) -> Option<T> where F: FnOnce(&ClosedBlock) -> T {
fn map_sealing_work<F, T>(&self, _chain: &MiningBlockChainClient, _f: F) -> Option<T> where F: FnOnce(&ClosedBlock) -> T {
unimplemented!();
}
@ -194,29 +194,29 @@ impl MinerService for TestMinerService {
/// Submit `seal` as a valid solution for the header of `pow_hash`.
/// Will check the seal, but not actually insert the block into the chain.
fn submit_seal(&self, _chain: &BlockChainClient, _pow_hash: H256, _seal: Vec<Bytes>) -> Result<(), Error> {
fn submit_seal(&self, _chain: &MiningBlockChainClient, _pow_hash: H256, _seal: Vec<Bytes>) -> Result<(), Error> {
unimplemented!();
}
fn balance(&self, _chain: &BlockChainClient, address: &Address) -> U256 {
fn balance(&self, _chain: &MiningBlockChainClient, address: &Address) -> U256 {
self.latest_closed_block.lock().unwrap().as_ref().map_or_else(U256::zero, |b| b.block().fields().state.balance(address).clone())
}
fn call(&self, _chain: &BlockChainClient, _t: &SignedTransaction) -> Result<Executed, ExecutionError> {
fn call(&self, _chain: &MiningBlockChainClient, _t: &SignedTransaction) -> Result<Executed, ExecutionError> {
unimplemented!();
}
fn storage_at(&self, _chain: &BlockChainClient, address: &Address, position: &H256) -> H256 {
fn storage_at(&self, _chain: &MiningBlockChainClient, address: &Address, position: &H256) -> H256 {
self.latest_closed_block.lock().unwrap().as_ref().map_or_else(H256::default, |b| b.block().fields().state.storage_at(address, position).clone())
}
fn nonce(&self, _chain: &BlockChainClient, address: &Address) -> U256 {
fn nonce(&self, _chain: &MiningBlockChainClient, address: &Address) -> U256 {
// we assume all transactions are in a pending block, ignoring the
// reality of gas limits.
self.last_nonce(address).unwrap_or(U256::zero())
}
fn code(&self, _chain: &BlockChainClient, address: &Address) -> Option<Bytes> {
fn code(&self, _chain: &MiningBlockChainClient, address: &Address) -> Option<Bytes> {
self.latest_closed_block.lock().unwrap().as_ref().map_or(None, |b| b.block().fields().state.code(address).clone())
}

View File

@ -25,7 +25,7 @@ use ethcore::client::{TestBlockChainClient, EachBlockWith, Executed, Transaction
use ethcore::log_entry::{LocalizedLogEntry, LogEntry};
use ethcore::receipt::LocalizedReceipt;
use ethcore::transaction::{Transaction, Action};
use ethminer::{ExternalMiner, MinerService};
use ethcore::miner::{ExternalMiner, MinerService};
use ethsync::SyncState;
use v1::{Eth, EthClient, EthSigning, EthSigningUnsafeClient};
use v1::tests::helpers::{TestSyncProvider, Config, TestMinerService};

View File

@ -18,7 +18,7 @@ use std::sync::Arc;
use std::str::FromStr;
use jsonrpc_core::IoHandler;
use v1::{Ethcore, EthcoreClient};
use ethminer::MinerService;
use ethcore::miner::MinerService;
use v1::tests::helpers::TestMinerService;
use util::numbers::*;
use rustc_serialize::hex::FromHex;

View File

@ -11,7 +11,6 @@ authors = ["Ethcore <admin@ethcore.io"]
ethcore-util = { path = "../util" }
ethcore = { path = "../ethcore" }
clippy = { version = "0.0.69", optional = true}
ethminer = { path = "../miner" }
log = "0.3"
env_logger = "0.3"
time = "0.1.34"
@ -20,4 +19,4 @@ heapsize = "0.3"
[features]
default = []
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethminer/dev"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev"]

View File

@ -97,7 +97,6 @@ use ethcore::client::{BlockChainClient, BlockStatus, BlockID, BlockChainInfo};
use ethcore::error::*;
use ethcore::transaction::SignedTransaction;
use ethcore::block::Block;
use ethminer::{Miner, MinerService, AccountDetails};
use io::SyncIo;
use time;
use super::SyncConfig;
@ -241,15 +240,13 @@ pub struct ChainSync {
imported_this_round: Option<usize>,
/// Network ID
network_id: U256,
/// Miner
miner: Arc<Miner>,
}
type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
impl ChainSync {
/// Create a new instance of syncing strategy.
pub fn new(config: SyncConfig, miner: Arc<Miner>, chain: &BlockChainClient) -> ChainSync {
pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync {
let chain = chain.chain_info();
let mut sync = ChainSync {
state: SyncState::ChainHead,
@ -265,7 +262,6 @@ impl ChainSync {
imported_this_round: None,
_max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
miner: miner,
};
sync.reset();
sync
@ -898,12 +894,7 @@ impl ChainSync {
let tx: SignedTransaction = try!(r.val_at(i));
transactions.push(tx);
}
let chain = io.chain();
let fetch_account = |a: &Address| AccountDetails {
nonce: chain.latest_nonce(a),
balance: chain.latest_balance(a),
};
let _ = self.miner.import_transactions(transactions, fetch_account);
let _ = io.chain().import_transactions(transactions);
Ok(())
}
@ -1226,7 +1217,7 @@ impl ChainSync {
return 0;
}
let mut transactions = self.miner.all_transactions();
let mut transactions = io.chain().all_transactions();
if transactions.is_empty() {
return 0;
}
@ -1276,11 +1267,9 @@ impl ChainSync {
self.check_resume(io);
}
/// called when block is imported to chain, updates transactions queue and propagates the blocks
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]) {
/// called when block is imported to chain, updates transactions queue and propagates the blocks
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256]) {
if io.is_chain_queue_empty() {
// Notify miner
self.miner.chain_new_blocks(io.chain(), imported, invalid, enacted, retracted);
// Propagate latests blocks
self.propagate_latest_blocks(io);
}
@ -1289,10 +1278,6 @@ impl ChainSync {
self.restart_on_bad_block(io);
}
}
pub fn chain_new_head(&mut self, io: &mut SyncIo) {
self.miner.update_sealing(io.chain());
}
}
#[cfg(test)]
@ -1305,8 +1290,7 @@ mod tests {
use ethcore::views::BlockView;
use ethcore::header::*;
use ethcore::client::*;
use ethcore::spec::Spec;
use ethminer::{Miner, MinerService};
use ethcore::miner::MinerService;
fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes {
let mut header = Header::new();
@ -1480,7 +1464,7 @@ mod tests {
}
fn dummy_sync_with_peer(peer_latest_hash: H256, client: &BlockChainClient) -> ChainSync {
let mut sync = ChainSync::new(SyncConfig::default(), Miner::new(false, Spec::new_test()), client);
let mut sync = ChainSync::new(SyncConfig::default(), client);
sync.peers.insert(0,
PeerInfo {
protocol_version: 0,
@ -1711,9 +1695,10 @@ mod tests {
{
let mut queue = VecDeque::new();
let mut io = TestIo::new(&mut client, &mut queue, None);
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks);
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks);
assert_eq!(sync.miner.status().transactions_in_future_queue, 0);
assert_eq!(sync.miner.status().transactions_in_pending_queue, 1);
assert_eq!(io.chain.miner.status().transactions_in_future_queue, 0);
assert_eq!(io.chain.miner.status().transactions_in_pending_queue, 1);
}
// We need to update nonce status (because we say that the block has been imported)
for h in &[good_blocks[0]] {
@ -1724,11 +1709,12 @@ mod tests {
{
let mut queue = VecDeque::new();
let mut io = TestIo::new(&mut client, &mut queue, None);
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &good_blocks, &retracted_blocks);
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks);
}
// then
let status = sync.miner.status();
let status = client.miner.status();
assert_eq!(status.transactions_in_pending_queue, 1);
assert_eq!(status.transactions_in_future_queue, 0);
}
@ -1750,12 +1736,12 @@ mod tests {
// when
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks);
assert_eq!(sync.miner.status().transactions_in_future_queue, 0);
assert_eq!(sync.miner.status().transactions_in_pending_queue, 0);
assert_eq!(io.chain.miner.status().transactions_in_future_queue, 0);
assert_eq!(io.chain.miner.status().transactions_in_pending_queue, 0);
sync.chain_new_blocks(&mut io, &[], &[], &good_blocks, &retracted_blocks);
// then
let status = sync.miner.status();
let status = io.chain.miner.status();
assert_eq!(status.transactions_in_pending_queue, 0);
assert_eq!(status.transactions_in_future_queue, 0);
}

View File

@ -14,10 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ethcore::client::BlockChainClient;
use util::{NetworkContext, PeerId, PacketId,};
use util::error::UtilError;
use ethcore::service::SyncMessage;
use ethcore::client::BlockChainClient;
/// IO interface for the syning handler.
/// Provides peer connection management and an interface to the blockchain client.

View File

@ -32,21 +32,20 @@
//! extern crate ethcore_util as util;
//! extern crate ethcore;
//! extern crate ethsync;
//! extern crate ethminer;
//! use std::env;
//! use std::sync::Arc;
//! use util::network::{NetworkService, NetworkConfiguration};
//! use ethcore::client::{Client, ClientConfig};
//! use ethsync::{EthSync, SyncConfig};
//! use ethminer::Miner;
//! use ethcore::ethereum;
//! use ethcore::miner::Miner;
//!
//! fn main() {
//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap();
//! let dir = env::temp_dir();
//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, service.io().channel()).unwrap();
//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, Arc::new(Miner::default()), service.io().channel()).unwrap();
//! let miner = Miner::new(false, ethereum::new_frontier());
//! EthSync::register(&mut service, SyncConfig::default(), client, miner);
//! EthSync::register(&mut service, SyncConfig::default(), client);
//! }
//! ```
@ -55,7 +54,6 @@ extern crate log;
#[macro_use]
extern crate ethcore_util as util;
extern crate ethcore;
extern crate ethminer;
extern crate env_logger;
extern crate time;
extern crate rand;
@ -69,7 +67,6 @@ use util::TimerToken;
use util::{U256, ONE_U256};
use ethcore::client::Client;
use ethcore::service::SyncMessage;
use ethminer::Miner;
use io::NetSyncIo;
use chain::ChainSync;
@ -115,8 +112,8 @@ pub use self::chain::{SyncStatus, SyncState};
impl EthSync {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService<SyncMessage>, config: SyncConfig, chain: Arc<Client>, miner: Arc<Miner>) -> Arc<EthSync> {
let sync = ChainSync::new(config, miner, chain.deref());
pub fn register(service: &mut NetworkService<SyncMessage>, config: SyncConfig, chain: Arc<Client>) -> Arc<EthSync> {
let sync = ChainSync::new(config, chain.deref());
let sync = Arc::new(EthSync {
chain: chain,
sync: RwLock::new(sync),
@ -171,10 +168,6 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
let mut sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted);
},
SyncMessage::NewChainHead => {
let mut sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_head(&mut sync_io);
},
_ => {/* Ignore other messages */},
}
}

View File

@ -16,10 +16,8 @@
use util::*;
use ethcore::client::{TestBlockChainClient, BlockChainClient};
use ethcore::spec::Spec;
use io::SyncIo;
use chain::ChainSync;
use ethminer::Miner;
use ::SyncConfig;
pub struct TestIo<'p> {
@ -93,7 +91,7 @@ impl TestNet {
};
for _ in 0..n {
let chain = TestBlockChainClient::new();
let sync = ChainSync::new(SyncConfig::default(), Miner::new(false, Spec::new_test()), &chain);
let sync = ChainSync::new(SyncConfig::default(), &chain);
net.peers.push(TestPeer {
sync: sync,
chain: chain,

View File

@ -10,5 +10,4 @@ cargo test --features ethcore/json-tests $1 \
-p ethcore-signer \
-p ethcore-dapps \
-p parity \
-p ethminer \
-p bigint

View File

@ -25,7 +25,7 @@ elastic-array = "0.4"
heapsize = "0.3"
itertools = "0.4"
crossbeam = "0.2"
slab = "0.1"
slab = "0.2"
sha3 = { path = "sha3" }
serde = "0.7.0"
clippy = { version = "0.0.69", optional = true}

View File

@ -170,16 +170,16 @@ impl Connection {
self.token
}
/// Replace socket token
pub fn set_token(&mut self, token: StreamToken) {
self.token = token;
}
/// Get remote peer address
pub fn remote_addr(&self) -> io::Result<SocketAddr> {
self.socket.peer_addr()
}
/// Get remote peer address string
pub fn remote_addr_str(&self) -> String {
self.socket.peer_addr().map(|a| a.to_string()).unwrap_or_else(|_| "Unknown".to_owned())
}
/// Clone this connection. Clears the receiving buffer of the returned connection.
pub fn try_clone(&self) -> io::Result<Self> {
Ok(Connection {
@ -196,7 +196,7 @@ impl Connection {
/// Register this connection with the IO event loop.
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "network", "connection register; token={:?}", reg);
if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()) {
if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */) { // TODO: oneshot is broken on windows
trace!(target: "network", "Failed to register {:?}, {:?}", reg, e);
}
Ok(())
@ -205,7 +205,7 @@ impl Connection {
/// Update connection registration. Should be called at the end of the IO handler.
pub fn update_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "network", "connection reregister; token={:?}", reg);
event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() /* | PollOpt::oneshot() */ ).or_else(|e| { // TODO: oneshot is broken on windows
trace!(target: "network", "Failed to reregister {:?}, {:?}", reg, e);
Ok(())
})
@ -246,7 +246,7 @@ enum EncryptedConnectionState {
/// https://github.com/ethereum/devp2p/blob/master/rlpx.md#framing
pub struct EncryptedConnection {
/// Underlying tcp connection
connection: Connection,
pub connection: Connection,
/// Egress data encryptor
encoder: CtrMode<AesSafe256Encryptor>,
/// Ingress data decryptor
@ -266,27 +266,6 @@ pub struct EncryptedConnection {
}
impl EncryptedConnection {
/// Get socket token
pub fn token(&self) -> StreamToken {
self.connection.token
}
/// Replace socket token
pub fn set_token(&mut self, token: StreamToken) {
self.connection.set_token(token);
}
/// Get remote peer address
pub fn remote_addr(&self) -> io::Result<SocketAddr> {
self.connection.remote_addr()
}
/// Check if this connection has data to be sent.
pub fn is_sending(&self) -> bool {
self.connection.is_sending()
}
/// Create an encrypted connection out of the handshake. Consumes a handshake object.
pub fn new(handshake: &mut Handshake) -> Result<EncryptedConnection, UtilError> {
let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_ephemeral));
@ -323,8 +302,10 @@ impl EncryptedConnection {
ingress_mac.update(&mac_material);
ingress_mac.update(if handshake.originated { &handshake.ack_cipher } else { &handshake.auth_cipher });
let old_connection = try!(handshake.connection.try_clone());
let connection = ::std::mem::replace(&mut handshake.connection, old_connection);
let mut enc = EncryptedConnection {
connection: try!(handshake.connection.try_clone()),
connection: connection,
encoder: encoder,
decoder: decoder,
mac_encoder: mac_encoder,
@ -463,24 +444,6 @@ impl EncryptedConnection {
try!(self.connection.writable());
Ok(())
}
/// Register socket with the event lpop. This should be called at the end of the event loop.
pub fn register_socket<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.register_socket(reg, event_loop));
Ok(())
}
/// Update connection registration. This should be called at the end of the event loop.
pub fn update_socket<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.update_socket(reg, event_loop));
Ok(())
}
/// Delete connection registration. This should be called at the end of the event loop.
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.deregister_socket(event_loop));
Ok(())
}
}
#[test]

View File

@ -16,7 +16,6 @@
use std::sync::Arc;
use rand::random;
use mio::*;
use mio::tcp::*;
use hash::*;
use rlp::*;
@ -102,21 +101,6 @@ impl Handshake {
})
}
/// Get id of the remote node if known
pub fn id(&self) -> &NodeId {
&self.id
}
/// Get stream token id
pub fn token(&self) -> StreamToken {
self.connection.token()
}
/// Mark this handshake as inactive to be deleted lated.
pub fn set_expired(&mut self) {
self.expired = true;
}
/// Check if this handshake is expired.
pub fn expired(&self) -> bool {
self.expired
@ -177,7 +161,7 @@ impl Handshake {
}
/// Writabe IO handler.
pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone {
pub fn writable<Message>(&mut self, io: &IoContext<Message>) -> Result<(), UtilError> where Message: Send + Clone {
if !self.expired() {
io.clear_timer(self.connection.token).unwrap();
try!(self.connection.writable());
@ -188,28 +172,6 @@ impl Handshake {
Ok(())
}
/// Register the socket with the event loop
pub fn register_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
if !self.expired() {
try!(self.connection.register_socket(reg, event_loop));
}
Ok(())
}
/// Update socket registration with the event loop.
pub fn update_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
if !self.expired() {
try!(self.connection.update_socket(reg, event_loop));
}
Ok(())
}
/// Delete registration
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.deregister_socket(event_loop));
Ok(())
}
fn set_auth(&mut self, host_secret: &Secret, sig: &[u8], remote_public: &[u8], remote_nonce: &[u8], remote_version: u64) -> Result<(), UtilError> {
self.id.clone_from_slice(remote_public);
self.remote_nonce.clone_from_slice(remote_nonce);
@ -222,7 +184,7 @@ impl Handshake {
/// Parse, validate and confirm auth message
fn read_auth(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"network", "Received handshake auth from {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Received handshake auth from {:?}", self.connection.remote_addr_str());
if data.len() != V4_AUTH_PACKET_SIZE {
debug!(target:"net", "Wrong auth packet size");
return Err(From::from(NetworkError::BadProtocol));
@ -253,7 +215,7 @@ impl Handshake {
}
fn read_auth_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str());
self.auth_cipher.extend_from_slice(data);
let auth = try!(ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..]));
let rlp = UntrustedRlp::new(&auth);
@ -268,7 +230,7 @@ impl Handshake {
/// Parse and validate ack message
fn read_ack(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"network", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Received handshake auth to {:?}", self.connection.remote_addr_str());
if data.len() != V4_ACK_PACKET_SIZE {
debug!(target:"net", "Wrong ack packet size");
return Err(From::from(NetworkError::BadProtocol));
@ -296,7 +258,7 @@ impl Handshake {
}
fn read_ack_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.remote_addr_str());
self.ack_cipher.extend_from_slice(data);
let ack = try!(ecies::decrypt(secret, &self.ack_cipher[0..2], &self.ack_cipher[2..]));
let rlp = UntrustedRlp::new(&ack);
@ -309,7 +271,7 @@ impl Handshake {
/// Sends auth message
fn write_auth(&mut self, secret: &Secret, public: &Public) -> Result<(), UtilError> {
trace!(target:"network", "Sending handshake auth to {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Sending handshake auth to {:?}", self.connection.remote_addr_str());
let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants
let len = data.len();
{
@ -336,7 +298,7 @@ impl Handshake {
/// Sends ack message
fn write_ack(&mut self) -> Result<(), UtilError> {
trace!(target:"network", "Sending handshake ack to {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Sending handshake ack to {:?}", self.connection.remote_addr_str());
let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants
let len = data.len();
{
@ -355,7 +317,7 @@ impl Handshake {
/// Sends EIP8 ack message
fn write_ack_eip8(&mut self) -> Result<(), UtilError> {
trace!(target:"network", "Sending EIP8 handshake ack to {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Sending EIP8 handshake ack to {:?}", self.connection.remote_addr_str());
let mut rlp = RlpStream::new_list(3);
rlp.append(self.ecdhe.public());
rlp.append(&self.nonce);

View File

@ -18,6 +18,7 @@ use std::net::{SocketAddr};
use std::collections::{HashMap};
use std::str::{FromStr};
use std::sync::*;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::ops::*;
use std::cmp::min;
use std::path::{Path, PathBuf};
@ -31,7 +32,6 @@ use misc::version;
use crypto::*;
use sha3::Hashable;
use rlp::*;
use network::handshake::Handshake;
use network::session::{Session, SessionData};
use error::*;
use io::*;
@ -44,8 +44,7 @@ use network::ip_utils::{map_external_address, select_public_address};
type Slab<T> = ::slab::Slab<T, usize>;
const _DEFAULT_PORT: u16 = 30304;
const MAX_SESSIONS: usize = 1024;
const MAX_SESSIONS: usize = 1024 + MAX_HANDSHAKES;
const MAX_HANDSHAKES: usize = 80;
const MAX_HANDSHAKES_PER_ROUND: usize = 32;
const MAINTENANCE_TIMEOUT: u64 = 1000;
@ -115,18 +114,17 @@ impl NetworkConfiguration {
}
// Tokens
const TCP_ACCEPT: usize = LAST_HANDSHAKE + 1;
const IDLE: usize = LAST_HANDSHAKE + 2;
const DISCOVERY: usize = LAST_HANDSHAKE + 3;
const DISCOVERY_REFRESH: usize = LAST_HANDSHAKE + 4;
const DISCOVERY_ROUND: usize = LAST_HANDSHAKE + 5;
const INIT_PUBLIC: usize = LAST_HANDSHAKE + 6;
const NODE_TABLE: usize = LAST_HANDSHAKE + 7;
const TCP_ACCEPT: usize = SYS_TIMER + 1;
const IDLE: usize = SYS_TIMER + 2;
const DISCOVERY: usize = SYS_TIMER + 3;
const DISCOVERY_REFRESH: usize = SYS_TIMER + 4;
const DISCOVERY_ROUND: usize = SYS_TIMER + 5;
const INIT_PUBLIC: usize = SYS_TIMER + 6;
const NODE_TABLE: usize = SYS_TIMER + 7;
const FIRST_SESSION: usize = 0;
const LAST_SESSION: usize = FIRST_SESSION + MAX_SESSIONS - 1;
const FIRST_HANDSHAKE: usize = LAST_SESSION + 1;
const LAST_HANDSHAKE: usize = FIRST_HANDSHAKE + MAX_HANDSHAKES - 1;
const USER_TIMER: usize = LAST_HANDSHAKE + 256;
const USER_TIMER: usize = LAST_SESSION + 256;
const SYS_TIMER: usize = LAST_SESSION + 1;
/// Protocol handler level packet id
pub type PacketId = u8;
@ -306,7 +304,6 @@ impl HostInfo {
}
type SharedSession = Arc<Mutex<Session>>;
type SharedHandshake = Arc<Mutex<Handshake>>;
#[derive(Copy, Clone)]
struct ProtocolTimer {
@ -318,7 +315,6 @@ struct ProtocolTimer {
pub struct Host<Message> where Message: Send + Sync + Clone {
pub info: RwLock<HostInfo>,
tcp_listener: Mutex<TcpListener>,
handshakes: Arc<RwLock<Slab<SharedHandshake>>>,
sessions: Arc<RwLock<Slab<SharedSession>>>,
discovery: Mutex<Option<Discovery>>,
nodes: RwLock<NodeTable>,
@ -327,6 +323,7 @@ pub struct Host<Message> where Message: Send + Sync + Clone {
timer_counter: RwLock<usize>,
stats: Arc<NetworkStats>,
pinned_nodes: Vec<NodeId>,
num_sessions: AtomicUsize,
}
impl<Message> Host<Message> where Message: Send + Sync + Clone {
@ -370,7 +367,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}),
discovery: Mutex::new(None),
tcp_listener: Mutex::new(tcp_listener),
handshakes: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_HANDSHAKE, MAX_HANDSHAKES))),
sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))),
nodes: RwLock::new(NodeTable::new(path)),
handlers: RwLock::new(HashMap::new()),
@ -378,6 +374,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
timer_counter: RwLock::new(USER_TIMER),
stats: Arc::new(NetworkStats::default()),
pinned_nodes: Vec::new(),
num_sessions: AtomicUsize::new(0),
};
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
@ -477,19 +474,19 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
fn have_session(&self, id: &NodeId) -> bool {
self.sessions.read().unwrap().iter().any(|e| e.lock().unwrap().info.id.eq(&id))
self.sessions.read().unwrap().iter().any(|e| e.lock().unwrap().info.id == Some(id.clone()))
}
fn session_count(&self) -> usize {
self.sessions.read().unwrap().count()
self.num_sessions.load(AtomicOrdering::Relaxed)
}
fn connecting_to(&self, id: &NodeId) -> bool {
self.handshakes.read().unwrap().iter().any(|e| e.lock().unwrap().id.eq(&id))
self.sessions.read().unwrap().iter().any(|e| e.lock().unwrap().id() == Some(id))
}
fn handshake_count(&self) -> usize {
self.handshakes.read().unwrap().count()
self.sessions.read().unwrap().count() - self.session_count()
}
fn keep_alive(&self, io: &IoContext<NetworkIoMessage<Message>>) {
@ -565,21 +562,31 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
}
};
self.create_connection(socket, Some(id), io);
if let Err(e) = self.create_connection(socket, Some(id), io) {
debug!(target: "network", "Can't create connection: {:?}", e);
}
}
#[cfg_attr(feature="dev", allow(block_in_if_condition_stmt))]
fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext<NetworkIoMessage<Message>>) {
fn create_connection(&self, socket: TcpStream, id: Option<&NodeId>, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
let nonce = self.info.write().unwrap().next_nonce();
let mut handshakes = self.handshakes.write().unwrap();
if handshakes.insert_with(|token| {
let mut handshake = Handshake::new(token, id, socket, &nonce, self.stats.clone()).expect("Can't create handshake");
handshake.start(io, &self.info.read().unwrap(), id.is_some()).and_then(|_| io.register_stream(token)).unwrap_or_else (|e| {
debug!(target: "network", "Handshake create error: {:?}", e);
});
Arc::new(Mutex::new(handshake))
}).is_none() {
debug!(target: "network", "Max handshakes reached");
let mut sessions = self.sessions.write().unwrap();
let token = sessions.insert_with_opt(|token| {
match Session::new(io, socket, token, id, &nonce, self.stats.clone(), &self.info.read().unwrap()) {
Ok(s) => Some(Arc::new(Mutex::new(s))),
Err(e) => {
debug!(target: "network", "Session create error: {:?}", e);
None
}
}
});
match token {
Some(t) => io.register_stream(t),
None => {
debug!(target: "network", "Max sessions reached");
Ok(())
}
}
}
@ -594,19 +601,11 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
break
},
};
self.create_connection(socket, None, io);
}
io.update_registration(TCP_ACCEPT).expect("Error registering TCP listener");
}
fn handshake_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
let handshake = { self.handshakes.read().unwrap().get(token).cloned() };
if let Some(handshake) = handshake {
let mut h = handshake.lock().unwrap();
if let Err(e) = h.writable(io, &self.info.read().unwrap()) {
trace!(target: "network", "Handshake write error: {}: {:?}", token, e);
if let Err(e) = self.create_connection(socket, None, io) {
debug!(target: "network", "Can't accept connection: {:?}", e);
}
}
io.update_registration(TCP_ACCEPT).expect("Error registering TCP listener");
}
fn session_writable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
@ -629,30 +628,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
self.kill_connection(token, io, true);
}
fn handshake_readable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
let mut create_session = false;
let mut kill = false;
let handshake = { self.handshakes.read().unwrap().get(token).cloned() };
if let Some(handshake) = handshake {
let mut h = handshake.lock().unwrap();
if let Err(e) = h.readable(io, &self.info.read().unwrap()) {
debug!(target: "network", "Handshake read error: {}: {:?}", token, e);
kill = true;
}
if h.done() {
create_session = true;
}
}
if kill {
self.kill_connection(token, io, true);
return;
} else if create_session {
self.start_session(token, io);
return;
}
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e));
}
fn session_readable(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
let mut ready_data: Vec<ProtocolId> = Vec::new();
let mut packet_data: Option<(ProtocolId, PacketId, Vec<u8>)> = None;
@ -662,17 +637,37 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut s = session.lock().unwrap();
match s.readable(io, &self.info.read().unwrap()) {
Err(e) => {
trace!(target: "network", "Session read error: {}:{} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
trace!(target: "network", "Session read error: {}:{:?} ({:?}) {:?}", token, s.id(), s.remote_addr(), e);
match e {
UtilError::Network(NetworkError::Disconnect(DisconnectReason::UselessPeer)) |
UtilError::Network(NetworkError::Disconnect(DisconnectReason::IncompatibleProtocol)) => {
self.nodes.write().unwrap().mark_as_useless(s.id());
if let Some(id) = s.id() {
self.nodes.write().unwrap().mark_as_useless(id);
}
}
_ => (),
}
kill = true;
},
Ok(SessionData::Ready) => {
if !s.info.originated {
let session_count = self.session_count();
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
if session_count >= ideal_peers as usize {
s.disconnect(DisconnectReason::TooManyPeers);
return;
}
// Add it no node table
if let Ok(address) = s.remote_addr() {
let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
self.nodes.write().unwrap().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
let mut discovery = self.discovery.lock().unwrap();
if let Some(ref mut discovery) = *discovery.deref_mut() {
discovery.add_node(entry);
}
}
}
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
for (p, _) in self.handlers.read().unwrap().iter() {
if s.have_capability(p) {
ready_data.push(p);
@ -697,6 +692,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
for p in ready_data {
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
self.stats.inc_sessions();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token);
}
if let Some((p, packet_id, data)) = packet_data {
@ -706,59 +702,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e));
}
fn start_session(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
let mut handshakes = self.handshakes.write().unwrap();
if handshakes.get(token).is_none() {
return;
}
// turn a handshake into a session
let mut sessions = self.sessions.write().unwrap();
let mut h = handshakes.get_mut(token).unwrap().lock().unwrap();
if h.expired {
return;
}
io.deregister_stream(token).expect("Error deleting handshake registration");
h.set_expired();
let originated = h.originated;
let mut session = match Session::new(&mut h, &self.info.read().unwrap()) {
Ok(s) => s,
Err(e) => {
debug!(target: "network", "Session creation error: {:?}", e);
return;
}
};
if !originated {
let session_count = sessions.count();
let ideal_peers = { self.info.read().unwrap().deref().config.ideal_peers };
if session_count >= ideal_peers as usize {
session.disconnect(DisconnectReason::TooManyPeers);
return;
}
}
let result = sessions.insert_with(move |session_token| {
session.set_token(session_token);
io.register_stream(session_token).expect("Error creating session registration");
self.stats.inc_sessions();
trace!(target: "network", "Creating session {} -> {}:{} ({:?})", token, session_token, session.id(), session.remote_addr());
if !originated {
// Add it no node table
if let Ok(address) = session.remote_addr() {
let entry = NodeEntry { id: session.id().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
self.nodes.write().unwrap().add_node(Node::new(entry.id.clone(), entry.endpoint.clone()));
let mut discovery = self.discovery.lock().unwrap();
if let Some(ref mut discovery) = *discovery.deref_mut() {
discovery.add_node(entry);
}
}
}
Arc::new(Mutex::new(session))
});
if result.is_none() {
warn!("Max sessions reached");
}
}
fn connection_timeout(&self, token: StreamToken, io: &IoContext<NetworkIoMessage<Message>>) {
trace!(target: "network", "Connection timeout: {}", token);
self.kill_connection(token, io, true)
@ -770,17 +713,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut deregister = false;
let mut expired_session = None;
match token {
FIRST_HANDSHAKE ... LAST_HANDSHAKE => {
let handshakes = self.handshakes.write().unwrap();
if let Some(handshake) = handshakes.get(token).cloned() {
let mut handshake = handshake.lock().unwrap();
if !handshake.expired() {
handshake.set_expired();
failure_id = Some(handshake.id().clone());
deregister = true;
}
}
},
FIRST_SESSION ... LAST_SESSION => {
let sessions = self.sessions.write().unwrap();
if let Some(session) = sessions.get(token).cloned() {
@ -790,12 +722,13 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
if s.is_ready() {
for (p, _) in self.handlers.read().unwrap().iter() {
if s.have_capability(p) {
self.num_sessions.fetch_sub(1, AtomicOrdering::SeqCst);
to_disconnect.push(p);
}
}
}
s.set_expired();
failure_id = Some(s.id().clone());
failure_id = s.id().cloned();
}
deregister = remote || s.done();
}
@ -821,20 +754,11 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
fn update_nodes(&self, io: &IoContext<NetworkIoMessage<Message>>, node_changes: TableUpdates) {
let mut to_remove: Vec<PeerId> = Vec::new();
{
{
let handshakes = self.handshakes.write().unwrap();
for c in handshakes.iter() {
let h = c.lock().unwrap();
if node_changes.removed.contains(&h.id()) {
to_remove.push(h.token());
}
}
}
{
let sessions = self.sessions.write().unwrap();
for c in sessions.iter() {
let s = c.lock().unwrap();
if node_changes.removed.contains(&s.id()) {
let sessions = self.sessions.write().unwrap();
for c in sessions.iter() {
let s = c.lock().unwrap();
if let Some(id) = s.id() {
if node_changes.removed.contains(id) {
to_remove.push(s.token());
}
}
@ -860,7 +784,6 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
trace!(target: "network", "Hup: {}", stream);
match stream {
FIRST_SESSION ... LAST_SESSION => self.connection_closed(stream, io),
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_closed(stream, io),
_ => warn!(target: "network", "Unexpected hup"),
};
}
@ -868,7 +791,6 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn stream_readable(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) {
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_readable(stream, io),
DISCOVERY => {
let node_changes = { self.discovery.lock().unwrap().as_mut().unwrap().readable() };
if let Some(node_changes) = node_changes {
@ -884,7 +806,6 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
fn stream_writable(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) {
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.handshake_writable(stream, io),
DISCOVERY => {
self.discovery.lock().unwrap().as_mut().unwrap().writable();
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
@ -899,7 +820,6 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
INIT_PUBLIC => self.init_public_interface(io).unwrap_or_else(|e|
warn!("Error initializing public interface: {:?}", e)),
FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io),
FIRST_HANDSHAKE ... LAST_HANDSHAKE => self.connection_timeout(token, io),
DISCOVERY_REFRESH => {
self.discovery.lock().unwrap().as_mut().unwrap().refresh();
io.update_registration(DISCOVERY).expect("Error updating discovery registration");
@ -966,7 +886,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
let session = { self.sessions.read().unwrap().get(*peer).cloned() };
if let Some(session) = session {
session.lock().unwrap().disconnect(DisconnectReason::DisconnectRequested);
self.nodes.write().unwrap().mark_as_useless(session.lock().unwrap().id());
if let Some(id) = session.lock().unwrap().id() {
self.nodes.write().unwrap().mark_as_useless(id)
}
}
trace!(target: "network", "Disabling peer {}", peer);
self.kill_connection(*peer, io, false);
@ -987,12 +909,6 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
session.lock().unwrap().register_socket(reg, event_loop).expect("Error registering socket");
}
}
FIRST_HANDSHAKE ... LAST_HANDSHAKE => {
let connection = { self.handshakes.read().unwrap().get(stream).cloned() };
if let Some(connection) = connection {
connection.lock().unwrap().register_socket(reg, event_loop).expect("Error registering socket");
}
}
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().register_socket(event_loop).expect("Error registering discovery socket"),
TCP_ACCEPT => event_loop.register(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error registering stream"),
_ => warn!("Unexpected stream registration")
@ -1008,13 +924,6 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
connections.remove(stream);
}
}
FIRST_HANDSHAKE ... LAST_HANDSHAKE => {
let mut connections = self.handshakes.write().unwrap();
if let Some(connection) = connections.get(stream).cloned() {
connection.lock().unwrap().deregister_socket(event_loop).expect("Error deregistering socket");
connections.remove(stream);
}
}
DISCOVERY => (),
_ => warn!("Unexpected stream deregistration")
}
@ -1028,12 +937,6 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
connection.lock().unwrap().update_socket(reg, event_loop).expect("Error updating socket");
}
}
FIRST_HANDSHAKE ... LAST_HANDSHAKE => {
let connection = { self.handshakes.read().unwrap().get(stream).cloned() };
if let Some(connection) = connection {
connection.lock().unwrap().update_socket(reg, event_loop).expect("Error updating socket");
}
}
DISCOVERY => self.discovery.lock().unwrap().as_ref().unwrap().update_registration(event_loop).expect("Error reregistering discovery socket"),
TCP_ACCEPT => event_loop.reregister(self.tcp_listener.lock().unwrap().deref(), Token(TCP_ACCEPT), EventSet::all(), PollOpt::edge()).expect("Error reregistering stream"),
_ => warn!("Unexpected stream update")

View File

@ -16,15 +16,19 @@
use std::net::SocketAddr;
use std::io;
use std::sync::*;
use mio::*;
use mio::tcp::*;
use rlp::*;
use network::connection::{EncryptedConnection, Packet};
use hash::*;
use network::connection::{EncryptedConnection, Packet, Connection};
use network::handshake::Handshake;
use error::*;
use io::{IoContext, StreamToken};
use network::error::{NetworkError, DisconnectReason};
use network::host::*;
use network::node_table::NodeId;
use network::stats::NetworkStats;
use time;
const PING_TIMEOUT_SEC: u64 = 30;
@ -36,14 +40,18 @@ const PING_INTERVAL_SEC: u64 = 30;
pub struct Session {
/// Shared session information
pub info: SessionInfo,
/// Underlying connection
connection: EncryptedConnection,
/// Session ready flag. Set after successfull Hello packet exchange
had_hello: bool,
/// Session is no longer active flag.
expired: bool,
ping_time_ns: u64,
pong_time_ns: Option<u64>,
state: State,
}
enum State {
Handshake(Handshake),
Session(EncryptedConnection),
}
/// Structure used to report various session events.
@ -65,7 +73,7 @@ pub enum SessionData {
/// Shared session information
pub struct SessionInfo {
/// Peer public key
pub id: NodeId,
pub id: Option<NodeId>,
/// Peer client ID
pub client_version: String,
/// Peer RLPx protocol version
@ -74,6 +82,8 @@ pub struct SessionInfo {
capabilities: Vec<SessionCapabilityInfo>,
/// Peer ping delay in milliseconds
pub ping_ms: Option<u64>,
/// True if this session was originated by us.
pub originated: bool,
}
#[derive(Debug, PartialEq, Eq)]
@ -112,31 +122,52 @@ const PACKET_LAST: u8 = 0x7f;
impl Session {
/// Create a new session out of comepleted handshake. This clones the handshake connection object
/// and leaves the handhsake in limbo to be deregistered from the event loop.
pub fn new(h: &mut Handshake, host: &HostInfo) -> Result<Session, UtilError> {
let id = h.id.clone();
let connection = try!(EncryptedConnection::new(h));
let mut session = Session {
connection: connection,
pub fn new<Message>(io: &IoContext<Message>, socket: TcpStream, token: StreamToken, id: Option<&NodeId>,
nonce: &H256, stats: Arc<NetworkStats>, host: &HostInfo) -> Result<Session, UtilError>
where Message: Send + Clone {
let originated = id.is_some();
let mut handshake = Handshake::new(token, id, socket, &nonce, stats).expect("Can't create handshake");
try!(handshake.start(io, host, originated));
Ok(Session {
state: State::Handshake(handshake),
had_hello: false,
info: SessionInfo {
id: id,
id: id.cloned(),
client_version: String::new(),
protocol_version: 0,
capabilities: Vec::new(),
ping_ms: None,
originated: originated,
},
ping_time_ns: 0,
pong_time_ns: None,
expired: false,
})
}
fn complete_handshake(&mut self, host: &HostInfo) -> Result<(), UtilError> {
let connection = if let State::Handshake(ref mut h) = self.state {
self.info.id = Some(h.id.clone());
try!(EncryptedConnection::new(h))
} else {
panic!("Unexpected state");
};
try!(session.write_hello(host));
try!(session.send_ping());
Ok(session)
self.state = State::Session(connection);
try!(self.write_hello(host));
try!(self.send_ping());
Ok(())
}
fn connection(&self) -> &Connection {
match self.state {
State::Handshake(ref h) => &h.connection,
State::Session(ref s) => &s.connection,
}
}
/// Get id of the remote peer
pub fn id(&self) -> &NodeId {
&self.info.id
pub fn id(&self) -> Option<&NodeId> {
self.info.id.as_ref()
}
/// Check if session is ready to send/receive data
@ -151,21 +182,20 @@ impl Session {
/// Check if this session is expired.
pub fn expired(&self) -> bool {
self.expired
match self.state {
State::Handshake(ref h) => h.expired(),
_ => self.expired,
}
}
/// Check if this session is over and there is nothing to be sent.
pub fn done(&self) -> bool {
self.expired() && !self.connection.is_sending()
}
/// Replace socket token
pub fn set_token(&mut self, token: StreamToken) {
self.connection.set_token(token);
self.expired() && !self.connection().is_sending()
}
/// Get remote peer address
pub fn remote_addr(&self) -> io::Result<SocketAddr> {
self.connection.remote_addr()
self.connection().remote_addr()
}
/// Readable IO handler. Returns packet data if available.
@ -173,15 +203,37 @@ impl Session {
if self.expired() {
return Ok(SessionData::None)
}
match try!(self.connection.readable(io)) {
Some(data) => Ok(try!(self.read_packet(data, host))),
None => Ok(SessionData::None)
let mut create_session = false;
let mut packet_data = None;
match self.state {
State::Handshake(ref mut h) => {
try!(h.readable(io, host));
if h.done() {
create_session = true;
}
}
State::Session(ref mut c) => {
match try!(c.readable(io)) {
data @ Some(_) => packet_data = data,
None => return Ok(SessionData::None)
}
}
}
if let Some(data) = packet_data {
return Ok(try!(self.read_packet(data, host)));
}
if create_session {
try!(self.complete_handshake(host));
}
Ok(SessionData::None)
}
/// Writable IO handler. Sends pending packets.
pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), UtilError> where Message: Send + Sync + Clone {
self.connection.writable(io)
match self.state {
State::Handshake(ref mut h) => h.writable(io),
State::Session(ref mut s) => s.writable(io),
}
}
/// Checks if peer supports given capability
@ -194,18 +246,20 @@ impl Session {
if self.expired() {
return Ok(());
}
try!(self.connection.register_socket(reg, event_loop));
try!(self.connection().register_socket(reg, event_loop));
Ok(())
}
/// Update registration with the event loop. Should be called at the end of the IO handler.
pub fn update_socket<Host:Handler>(&self, reg:Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.connection.update_socket(reg, event_loop)
try!(self.connection().update_socket(reg, event_loop));
Ok(())
}
/// Delete registration
pub fn deregister_socket<Host:Handler>(&self, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
self.connection.deregister_socket(event_loop)
try!(self.connection().deregister_socket(event_loop));
Ok(())
}
/// Send a protocol packet to peer.
@ -221,7 +275,7 @@ impl Session {
while protocol != self.info.capabilities[i].protocol {
i += 1;
if i == self.info.capabilities.len() {
debug!(target: "net", "Unknown protocol: {:?}", protocol);
debug!(target: "network", "Unknown protocol: {:?}", protocol);
return Ok(())
}
}
@ -229,11 +283,14 @@ impl Session {
let mut rlp = RlpStream::new();
rlp.append(&(pid as u32));
rlp.append_raw(data, 1);
self.connection.send_packet(&rlp.out())
self.send(rlp)
}
/// Keep this session alive. Returns false if ping timeout happened
pub fn keep_alive<Message>(&mut self, io: &IoContext<Message>) -> bool where Message: Send + Sync + Clone {
if let State::Handshake(_) = self.state {
return true;
}
let timed_out = if let Some(pong) = self.pong_time_ns {
pong - self.ping_time_ns > PING_TIMEOUT_SEC * 1000_000_000
} else {
@ -244,13 +301,13 @@ impl Session {
if let Err(e) = self.send_ping() {
debug!("Error sending ping message: {:?}", e);
}
io.update_registration(self.token()).unwrap_or_else(|e| debug!(target: "net", "Session registration error: {:?}", e));
io.update_registration(self.token()).unwrap_or_else(|e| debug!(target: "network", "Session registration error: {:?}", e));
}
!timed_out
}
pub fn token(&self) -> StreamToken {
self.connection.token()
self.connection().token()
}
fn read_packet(&mut self, packet: Packet, host: &HostInfo) -> Result<SessionData, UtilError> {
@ -288,7 +345,7 @@ impl Session {
while packet_id < self.info.capabilities[i].id_offset {
i += 1;
if i == self.info.capabilities.len() {
debug!(target: "net", "Unknown packet: {:?}", packet_id);
debug!(target: "network", "Unknown packet: {:?}", packet_id);
return Ok(SessionData::None)
}
}
@ -299,7 +356,7 @@ impl Session {
Ok(SessionData::Packet { data: packet.data, protocol: protocol, packet_id: pid } )
},
_ => {
debug!(target: "net", "Unknown packet: {:?}", packet_id);
debug!(target: "network", "Unknown packet: {:?}", packet_id);
Ok(SessionData::None)
}
}
@ -314,7 +371,7 @@ impl Session {
.append(&host.capabilities)
.append(&host.local_endpoint.address.port())
.append(host.id());
self.connection.send_packet(&rlp.out())
self.send(rlp)
}
fn read_hello(&mut self, rlp: &UntrustedRlp, host: &HostInfo) -> Result<(), UtilError> {
@ -384,11 +441,13 @@ impl Session {
/// Disconnect this session
pub fn disconnect(&mut self, reason: DisconnectReason) -> NetworkError {
let mut rlp = RlpStream::new();
rlp.append(&(PACKET_DISCONNECT as u32));
rlp.begin_list(1);
rlp.append(&(reason as u32));
self.connection.send_packet(&rlp.out()).ok();
if let State::Session(_) = self.state {
let mut rlp = RlpStream::new();
rlp.append(&(PACKET_DISCONNECT as u32));
rlp.begin_list(1);
rlp.append(&(reason as u32));
self.send(rlp).ok();
}
NetworkError::Disconnect(reason)
}
@ -400,7 +459,15 @@ impl Session {
}
fn send(&mut self, rlp: RlpStream) -> Result<(), UtilError> {
self.connection.send_packet(&rlp.out())
match self.state {
State::Handshake(_) => {
warn!(target:"network", "Unexpected send request");
},
State::Session(ref mut s) => {
try!(s.send_packet(&rlp.out()))
},
}
Ok(())
}
}