Merge branch 'master' of github.com:ethcore/parity into presale_wallet

This commit is contained in:
debris 2016-06-21 15:19:31 +02:00
commit 94a0193047
14 changed files with 294 additions and 74 deletions

4
Cargo.lock generated
View File

@ -1103,7 +1103,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "rocksdb"
version = "0.4.5"
source = "git+https://github.com/ethcore/rust-rocksdb#9140e37ce0fdb748097f85653c01b0f7e3736ea9"
source = "git+https://github.com/ethcore/rust-rocksdb#e0e6c099d8cd156fe446009fce241d57b00cd8f4"
dependencies = [
"libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)",
"rocksdb-sys 0.3.0 (git+https://github.com/ethcore/rust-rocksdb)",
@ -1112,7 +1112,7 @@ dependencies = [
[[package]]
name = "rocksdb-sys"
version = "0.3.0"
source = "git+https://github.com/ethcore/rust-rocksdb#9140e37ce0fdb748097f85653c01b0f7e3736ea9"
source = "git+https://github.com/ethcore/rust-rocksdb#e0e6c099d8cd156fe446009fce241d57b00cd8f4"
dependencies = [
"gcc 0.3.28 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -686,14 +686,15 @@ impl TransactionQueue {
if let Some(order) = self.future.drop(&address, &nonce) {
// Let's insert that transaction to current (if it has higher gas_price)
let future_tx = self.by_hash.remove(&order.hash).unwrap();
try!(check_too_cheap(Self::replace_transaction(future_tx, state_nonce, &mut self.current, &mut self.by_hash)));
// if transaction in `current` (then one we are importing) is replaced it means that it has to low gas_price
try!(check_too_cheap(!Self::replace_transaction(future_tx, state_nonce, &mut self.current, &mut self.by_hash)));
}
// Also enforce the limit
let removed = self.current.enforce_limit(&mut self.by_hash);
// If some transaction were removed because of limit we need to update last_nonces also.
self.update_last_nonces(&removed);
// Trigger error if we were removed.
// Trigger error if the transaction we are importing was removed.
try!(check_if_removed(&address, &nonce, removed));
trace!(target: "miner", "status: {:?}", self.status());
@ -937,7 +938,7 @@ mod test {
let res = txq.add(tx2.clone(), &default_nonce, TransactionOrigin::External);
// and then there should be only one transaction in current (the one with higher gas_price)
assert_eq!(unwrap_tx_err(res), TransactionError::TooCheapToReplace);
assert_eq!(res.unwrap(), TransactionImportResult::Current);
assert_eq!(txq.status().pending, 1);
assert_eq!(txq.status().future, 0);
assert_eq!(txq.current.by_priority.len(), 1);

View File

@ -31,3 +31,11 @@ impl From<json::H160> for Address {
From::from(a)
}
}
impl<'a> From<&'a json::H160> for Address {
fn from(json: &'a json::H160) -> Self {
let mut a = [0u8; 20];
a.copy_from_slice(json);
From::from(a)
}
}

View File

@ -200,7 +200,10 @@ impl Configuration {
net_path.push("network");
ret.config_path = Some(net_path.to_str().unwrap().to_owned());
ret.reserved_nodes = self.init_reserved_nodes();
ret.reserved_only = self.args.flag_reserved_only;
if self.args.flag_reserved_only {
ret.non_reserved_mode = ::util::network::NonReservedPeerMode::Deny;
}
ret
}

View File

@ -230,6 +230,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
logger: logger.clone(),
settings: network_settings.clone(),
allow_pending_receipt_query: !conf.args.flag_geth,
net_service: service.network(),
});
let dependencies = rpc::Dependencies {
@ -315,11 +316,11 @@ fn execute_export(conf: Configuration) {
udp_port: None,
nat_enabled: false,
discovery_enabled: false,
reserved_only: true,
boot_nodes: Vec::new(),
use_secret: None,
ideal_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: ::util::network::NonReservedPeerMode::Accept,
};
let client_config = conf.client_config(&spec);
@ -387,11 +388,11 @@ fn execute_import(conf: Configuration) {
udp_port: None,
nat_enabled: false,
discovery_enabled: false,
reserved_only: true,
boot_nodes: Vec::new(),
use_secret: None,
ideal_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: ::util::network::NonReservedPeerMode::Accept,
};
let client_config = conf.client_config(&spec);

View File

@ -89,6 +89,7 @@ fn current_version(path: &PathBuf) -> Result<u32, Error> {
/// Writes current database version to the file.
/// Creates a new file if the version file does not exist yet.
fn update_version(path: &PathBuf) -> Result<(), Error> {
try!(fs::create_dir_all(path));
let mut file = try!(File::create(version_file_path(path)));
try!(file.write_all(format!("{}", CURRENT_VERSION).as_bytes()));
Ok(())

View File

@ -25,6 +25,7 @@ use ethcore::client::Client;
use util::RotatingLogger;
use ethcore::account_provider::AccountProvider;
use util::network_settings::NetworkSettings;
use util::network::NetworkService;
#[cfg(feature="rpc")]
pub use ethcore_rpc::ConfirmationsQueue;
@ -89,6 +90,7 @@ pub struct Dependencies {
pub logger: Arc<RotatingLogger>,
pub settings: Arc<NetworkSettings>,
pub allow_pending_receipt_query: bool,
pub net_service: Arc<NetworkService<::ethcore::service::SyncMessage>>,
}
fn to_modules(apis: &[Api]) -> BTreeMap<String, String> {
@ -163,7 +165,7 @@ pub fn setup_rpc<T: Extendable>(server: T, deps: Arc<Dependencies>, apis: ApiSet
server.add_delegate(EthcoreClient::new(&deps.client, &deps.miner, deps.logger.clone(), deps.settings.clone()).to_delegate())
},
Api::EthcoreSet => {
server.add_delegate(EthcoreSetClient::new(&deps.miner).to_delegate())
server.add_delegate(EthcoreSetClient::new(&deps.miner, &deps.net_service).to_delegate())
},
Api::Traces => {
server.add_delegate(TracesClient::new(&deps.client, &deps.miner).to_delegate())

View File

@ -16,9 +16,11 @@
/// Ethcore-specific rpc interface for operations altering the settings.
use util::{U256, Address};
use util::network::{NetworkService, NonReservedPeerMode};
use std::sync::{Arc, Weak};
use jsonrpc_core::*;
use ethcore::miner::MinerService;
use ethcore::service::SyncMessage;
use v1::traits::EthcoreSet;
use v1::types::{Bytes};
@ -27,13 +29,15 @@ pub struct EthcoreSetClient<M> where
M: MinerService {
miner: Weak<M>,
net: Weak<NetworkService<SyncMessage>>,
}
impl<M> EthcoreSetClient<M> where M: MinerService {
/// Creates new `EthcoreSetClient`.
pub fn new(miner: &Arc<M>) -> Self {
pub fn new(miner: &Arc<M>, net: &Arc<NetworkService<SyncMessage>>) -> Self {
EthcoreSetClient {
miner: Arc::downgrade(miner),
net: Arc::downgrade(net),
}
}
}
@ -74,4 +78,32 @@ impl<M> EthcoreSet for EthcoreSetClient<M> where M: MinerService + 'static {
to_value(&true)
})
}
fn add_reserved_peer(&self, params: Params) -> Result<Value, Error> {
from_params::<(String,)>(params).and_then(|(peer,)| {
match take_weak!(self.net).add_reserved_peer(&peer) {
Ok(()) => to_value(&true),
Err(_) => Err(Error::invalid_params()),
}
})
}
fn remove_reserved_peer(&self, params: Params) -> Result<Value, Error> {
from_params::<(String,)>(params).and_then(|(peer,)| {
match take_weak!(self.net).remove_reserved_peer(&peer) {
Ok(()) => to_value(&true),
Err(_) => Err(Error::invalid_params()),
}
})
}
fn drop_non_reserved_peers(&self, _: Params) -> Result<Value, Error> {
take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Deny);
to_value(&true)
}
fn accept_non_reserved_peers(&self, _: Params) -> Result<Value, Error> {
take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Accept);
to_value(&true)
}
}

View File

@ -19,11 +19,13 @@ use std::str::FromStr;
use jsonrpc_core::IoHandler;
use v1::{Ethcore, EthcoreClient, EthcoreSet, EthcoreSetClient};
use ethcore::miner::MinerService;
use ethcore::service::SyncMessage;
use v1::tests::helpers::TestMinerService;
use ethcore::client::{TestBlockChainClient};
use util::numbers::*;
use rustc_serialize::hex::FromHex;
use util::log::RotatingLogger;
use util::network::{NetworkConfiguration, NetworkService};
use util::network_settings::NetworkSettings;
fn miner_service() -> Arc<TestMinerService> {
@ -50,21 +52,26 @@ fn settings() -> Arc<NetworkSettings> {
})
}
fn network_service() -> Arc<NetworkService<SyncMessage>> {
Arc::new(NetworkService::new(NetworkConfiguration::new()).unwrap())
}
fn ethcore_client(client: &Arc<TestBlockChainClient>, miner: &Arc<TestMinerService>) -> EthcoreClient<TestBlockChainClient, TestMinerService> {
EthcoreClient::new(client, miner, logger(), settings())
}
fn ethcore_set_client(miner: &Arc<TestMinerService>) -> EthcoreSetClient<TestMinerService> {
EthcoreSetClient::new(miner)
fn ethcore_set_client(miner: &Arc<TestMinerService>, net: &Arc<NetworkService<SyncMessage>>) -> EthcoreSetClient<TestMinerService> {
EthcoreSetClient::new(miner, net)
}
#[test]
fn rpc_ethcore_extra_data() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_extraData", "params": [], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"0x01020304","id":1}"#;
@ -79,9 +86,10 @@ fn rpc_ethcore_default_extra_data() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_defaultExtraData", "params": [], "id": 1}"#;
let response = format!(r#"{{"jsonrpc":"2.0","result":"0x{}","id":1}}"#, misc::version_data().to_hex());
@ -93,9 +101,10 @@ fn rpc_ethcore_default_extra_data() {
fn rpc_ethcore_gas_floor_target() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_gasFloorTarget", "params": [], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"0x3039","id":1}"#;
@ -107,9 +116,10 @@ fn rpc_ethcore_gas_floor_target() {
fn rpc_ethcore_min_gas_price() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_minGasPrice", "params": [], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"0x01312d00","id":1}"#;
@ -121,9 +131,10 @@ fn rpc_ethcore_min_gas_price() {
fn rpc_ethcore_set_min_gas_price() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setMinGasPrice", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
@ -136,9 +147,10 @@ fn rpc_ethcore_set_min_gas_price() {
fn rpc_ethcore_set_gas_floor_target() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setGasFloorTarget", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
@ -151,9 +163,10 @@ fn rpc_ethcore_set_gas_floor_target() {
fn rpc_ethcore_set_extra_data() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setExtraData", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
@ -166,9 +179,10 @@ fn rpc_ethcore_set_extra_data() {
fn rpc_ethcore_set_author() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setAuthor", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
@ -181,13 +195,14 @@ fn rpc_ethcore_set_author() {
fn rpc_ethcore_dev_logs() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let logger = logger();
logger.append("a".to_owned());
logger.append("b".to_owned());
let ethcore = EthcoreClient::new(&client, &miner, logger.clone(), settings()).to_delegate();
let io = IoHandler::new();
io.add_delegate(ethcore);
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogs", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":["b","a"],"id":1}"#;
@ -199,9 +214,10 @@ fn rpc_ethcore_dev_logs() {
fn rpc_ethcore_dev_logs_levels() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogsLevels", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"rpc=trace","id":1}"#;
@ -212,9 +228,10 @@ fn rpc_ethcore_dev_logs_levels() {
fn rpc_ethcore_set_transactions_limit() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setTransactionsLimit", "params":[10240240], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
@ -227,9 +244,10 @@ fn rpc_ethcore_set_transactions_limit() {
fn rpc_ethcore_transactions_limit() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_transactionsLimit", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":1024,"id":1}"#;
@ -241,9 +259,10 @@ fn rpc_ethcore_transactions_limit() {
fn rpc_ethcore_net_chain() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netChain", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"testchain","id":1}"#;
@ -255,9 +274,10 @@ fn rpc_ethcore_net_chain() {
fn rpc_ethcore_net_max_peers() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netMaxPeers", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":25,"id":1}"#;
@ -269,9 +289,10 @@ fn rpc_ethcore_net_max_peers() {
fn rpc_ethcore_net_port() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netPort", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":30303,"id":1}"#;
@ -283,9 +304,10 @@ fn rpc_ethcore_net_port() {
fn rpc_ethcore_rpc_settings() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_rpcSettings", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":{"enabled":true,"interface":"all","port":8545},"id":1}"#;
@ -297,9 +319,10 @@ fn rpc_ethcore_rpc_settings() {
fn rpc_ethcore_node_name() {
let miner = miner_service();
let client = client_service();
let network = network_service();
let io = IoHandler::new();
io.add_delegate(ethcore_client(&client, &miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner).to_delegate());
io.add_delegate(ethcore_set_client(&miner, &network).to_delegate());
let request = r#"{"jsonrpc": "2.0", "method": "ethcore_nodeName", "params":[], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"mynode","id":1}"#;

View File

@ -37,6 +37,18 @@ pub trait EthcoreSet: Sized + Send + Sync + 'static {
/// Sets the limits for transaction queue.
fn set_transactions_limit(&self, _: Params) -> Result<Value, Error>;
/// Add a reserved peer.
fn add_reserved_peer(&self, _: Params) -> Result<Value, Error>;
/// Remove a reserved peer.
fn remove_reserved_peer(&self, _: Params) -> Result<Value, Error>;
/// Drop all non-reserved peers.
fn drop_non_reserved_peers(&self, _: Params) -> Result<Value, Error>;
/// Accept non-reserved peers (default behavior)
fn accept_non_reserved_peers(&self, _: Params) -> Result<Value, Error>;
/// Should be used to convert object to io delegate.
fn to_delegate(self) -> IoDelegate<Self> {
let mut delegate = IoDelegate::new(Arc::new(self));
@ -45,6 +57,10 @@ pub trait EthcoreSet: Sized + Send + Sync + 'static {
delegate.add_method("ethcore_setExtraData", EthcoreSet::set_extra_data);
delegate.add_method("ethcore_setAuthor", EthcoreSet::set_author);
delegate.add_method("ethcore_setTransactionsLimit", EthcoreSet::set_transactions_limit);
delegate.add_method("ethcore_addReservedPeer", EthcoreSet::add_reserved_peer);
delegate.add_method("ethcore_removeReservedPeer", EthcoreSet::remove_reserved_peer);
delegate.add_method("ethcore_dropNonReservedPeers", EthcoreSet::drop_non_reserved_peers);
delegate.add_method("ethcore_acceptNonReservedPeers", EthcoreSet::accept_non_reserved_peers);
delegate
}

View File

@ -20,6 +20,9 @@ use std::default::Default;
use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBVector, DBIterator,
IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction};
const DB_FILE_SIZE_BASE: u64 = 10 * 1024 * 1024;
const DB_FILE_SIZE_MULTIPLIER: i32 = 5;
/// Write transaction. Batches a sequence of put/delete operations for efficiency.
pub struct DBTransaction {
batch: WriteBatch,
@ -64,7 +67,7 @@ impl DatabaseConfig {
DatabaseConfig {
cache_size: Some(cache_size),
prefix_size: None,
max_open_files: 256
max_open_files: -1,
}
}
}
@ -74,7 +77,7 @@ impl Default for DatabaseConfig {
DatabaseConfig {
cache_size: None,
prefix_size: None,
max_open_files: 256
max_open_files: -1,
}
}
}
@ -110,6 +113,8 @@ impl Database {
opts.create_if_missing(true);
opts.set_use_fsync(false);
opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction);
opts.set_target_file_size_base(DB_FILE_SIZE_BASE);
opts.set_target_file_size_multiplier(DB_FILE_SIZE_MULTIPLIER);
if let Some(cache_size) = config.cache_size {
// half goes to read cache
opts.set_block_cache_size_mb(cache_size as u64 / 2);

View File

@ -14,9 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::net::{SocketAddr};
use std::collections::{HashMap};
use std::str::{FromStr};
use std::net::SocketAddr;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::*;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::ops::*;
@ -35,7 +35,7 @@ use rlp::*;
use network::session::{Session, SessionData};
use error::*;
use io::*;
use network::{NetworkProtocolHandler, PROTOCOL_VERSION};
use network::{NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION};
use network::node_table::*;
use network::stats::NetworkStats;
use network::error::{NetworkError, DisconnectReason};
@ -65,8 +65,6 @@ pub struct NetworkConfiguration {
pub nat_enabled: bool,
/// Enable discovery
pub discovery_enabled: bool,
/// Pin to reserved nodes only
pub reserved_only: bool,
/// List of initial node addresses
pub boot_nodes: Vec<String>,
/// Use provided node key instead of default
@ -75,6 +73,8 @@ pub struct NetworkConfiguration {
pub ideal_peers: u32,
/// List of reserved node addresses.
pub reserved_nodes: Vec<String>,
/// The non-reserved peer mode.
pub non_reserved_mode: NonReservedPeerMode,
}
impl Default for NetworkConfiguration {
@ -93,11 +93,11 @@ impl NetworkConfiguration {
udp_port: None,
nat_enabled: true,
discovery_enabled: true,
reserved_only: false,
boot_nodes: Vec::new(),
use_secret: None,
ideal_peers: 25,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Accept,
}
}
@ -191,13 +191,15 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta
sessions: Arc<RwLock<Slab<SharedSession>>>,
session: Option<SharedSession>,
session_id: Option<StreamToken>,
reserved_peers: &'s HashSet<NodeId>,
}
impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, {
/// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler.
fn new(io: &'s IoContext<NetworkIoMessage<Message>>,
protocol: ProtocolId,
session: Option<SharedSession>, sessions: Arc<RwLock<Slab<SharedSession>>>) -> NetworkContext<'s, Message> {
session: Option<SharedSession>, sessions: Arc<RwLock<Slab<SharedSession>>>,
reserved_peers: &'s HashSet<NodeId>) -> NetworkContext<'s, Message> {
let id = session.as_ref().map(|s| s.lock().unwrap().token());
NetworkContext {
io: io,
@ -205,6 +207,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
session_id: id,
session: session,
sessions: sessions,
reserved_peers: reserved_peers,
}
}
@ -237,7 +240,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
self.io.message(NetworkIoMessage::User(msg));
}
/// Send an IO message
/// Get an IoChannel.
pub fn io_channel(&self) -> IoChannel<NetworkIoMessage<Message>> {
self.io.channel()
}
@ -335,7 +338,7 @@ pub struct Host<Message> where Message: Send + Sync + Clone {
timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
timer_counter: RwLock<usize>,
stats: Arc<NetworkStats>,
pinned_nodes: Vec<NodeId>,
reserved_nodes: RwLock<HashSet<NodeId>>,
num_sessions: AtomicUsize,
stopping: AtomicBool,
}
@ -390,28 +393,28 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
timers: RwLock::new(HashMap::new()),
timer_counter: RwLock::new(USER_TIMER),
stats: stats,
pinned_nodes: Vec::new(),
reserved_nodes: RwLock::new(HashSet::new()),
num_sessions: AtomicUsize::new(0),
stopping: AtomicBool::new(false),
};
for n in boot_nodes {
// don't pin boot nodes.
host.add_node(&n, false);
host.add_node(&n);
}
for n in reserved_nodes {
host.add_node(&n, true);
if let Err(e) = host.add_reserved_node(&n) {
debug!(target: "network", "Error parsing node id: {}: {:?}", n, e);
}
}
Ok(host)
}
pub fn add_node(&mut self, id: &str, pin: bool) {
pub fn add_node(&mut self, id: &str) {
match Node::from_str(id) {
Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); },
Ok(n) => {
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
if pin { self.pinned_nodes.push(n.id.clone()) }
self.nodes.write().unwrap().add_node(n);
if let Some(ref mut discovery) = *self.discovery.lock().unwrap() {
@ -421,6 +424,56 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
}
pub fn add_reserved_node(&self, id: &str) -> Result<(), UtilError> {
let n = try!(Node::from_str(id));
let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() };
self.reserved_nodes.write().unwrap().insert(n.id.clone());
if let Some(ref mut discovery) = *self.discovery.lock().unwrap() {
discovery.add_node(entry);
}
Ok(())
}
pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext<NetworkIoMessage<Message>>) {
let mut info = self.info.write().unwrap();
if info.config.non_reserved_mode != mode {
info.config.non_reserved_mode = mode.clone();
drop(info);
if let NonReservedPeerMode::Deny = mode {
// disconnect all non-reserved peers here.
let reserved: HashSet<NodeId> = self.reserved_nodes.read().unwrap().clone();
let mut to_kill = Vec::new();
for e in self.sessions.write().unwrap().iter_mut() {
let mut s = e.lock().unwrap();
{
let id = s.id();
if id.is_some() && reserved.contains(id.unwrap()) {
continue;
}
}
s.disconnect(io, DisconnectReason::ClientQuit);
to_kill.push(s.token());
}
for p in to_kill {
trace!(target: "network", "Disconnecting on reserved-only mode: {}", p);
self.kill_connection(p, io, false);
}
}
}
}
pub fn remove_reserved_node(&self, id: &str) -> Result<(), UtilError> {
let n = try!(Node::from_str(id));
self.reserved_nodes.write().unwrap().remove(&n.id);
Ok(())
}
pub fn client_version() -> String {
version()
}
@ -483,7 +536,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// Initialize discovery.
let discovery = {
let info = self.info.read().unwrap();
if info.config.discovery_enabled && !info.config.reserved_only {
if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept {
Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY))
} else { None }
};
@ -540,17 +593,26 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
fn connect_peers(&self, io: &IoContext<NetworkIoMessage<Message>>) {
if self.info.read().unwrap().capabilities.is_empty() {
return;
}
let ideal_peers = { self.info.read().unwrap().config.ideal_peers };
let pin = { self.info.read().unwrap().config.reserved_only };
let session_count = self.session_count();
if session_count >= ideal_peers as usize + self.pinned_nodes.len() {
// check if all pinned nodes are connected.
if self.pinned_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) {
let (ideal_peers, mut pin) = {
let info = self.info.read().unwrap();
if info.capabilities.is_empty() {
return;
}
let config = &info.config;
(config.ideal_peers, config.non_reserved_mode == NonReservedPeerMode::Deny)
};
let session_count = self.session_count();
let reserved_nodes = self.reserved_nodes.read().unwrap();
if session_count >= ideal_peers as usize + reserved_nodes.len() {
// check if all pinned nodes are connected.
if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) {
return;
}
// if not, only attempt connect to reserved peers
pin = true;
}
let handshake_count = self.handshake_count();
@ -562,7 +624,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
// iterate over all nodes, reserved ones coming first.
// if we are pinned to only reserved nodes, ignore all others.
let nodes = self.pinned_nodes.iter().cloned().chain(if !pin {
let nodes = reserved_nodes.iter().cloned().chain(if !pin {
self.nodes.read().unwrap().nodes()
} else {
Vec::new()
@ -696,11 +758,20 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst);
if !s.info.originated {
let session_count = self.session_count();
let ideal_peers = { self.info.read().unwrap().config.ideal_peers };
if session_count >= ideal_peers as usize {
s.disconnect(io, DisconnectReason::TooManyPeers);
return;
let reserved_nodes = self.reserved_nodes.read().unwrap();
let (ideal_peers, reserved_only) = {
let info = self.info.read().unwrap();
(info.config.ideal_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny)
};
if session_count >= ideal_peers as usize || reserved_only {
// only proceed if the connecting peer is reserved.
if !reserved_nodes.contains(s.id().unwrap()) {
s.disconnect(io, DisconnectReason::TooManyPeers);
return;
}
}
// Add it no node table
if let Ok(address) = s.remote_addr() {
let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } };
@ -735,14 +806,17 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
if kill {
self.kill_connection(token, io, true);
}
let handlers = self.handlers.read().unwrap();
for p in ready_data {
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
let h = handlers.get(p).unwrap().clone();
self.stats.inc_sessions();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token);
let reserved = self.reserved_nodes.read().unwrap();
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token);
}
for (p, packet_id, data) in packet_data {
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token, packet_id, &data[1..]);
let h = handlers.get(p).unwrap().clone();
let reserved = self.reserved_nodes.read().unwrap();
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]);
}
}
@ -783,7 +857,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
for p in to_disconnect {
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone()), &token);
let reserved = self.reserved_nodes.read().unwrap();
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token);
}
if deregister {
io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e));
@ -886,7 +961,10 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
_ => match self.timers.read().unwrap().get(&token).cloned() {
Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() {
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); }
Some(h) => {
let reserved = self.reserved_nodes.read().unwrap();
h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone(), &reserved), timer.token);
}
},
None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us
}
@ -904,7 +982,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
ref versions
} => {
let h = handler.clone();
h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone()));
let reserved = self.reserved_nodes.read().unwrap();
h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved));
self.handlers.write().unwrap().insert(protocol, h);
let mut info = self.info.write().unwrap();
for v in versions {
@ -946,8 +1025,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
self.kill_connection(*peer, io, false);
},
NetworkIoMessage::User(ref message) => {
let reserved = self.reserved_nodes.read().unwrap();
for (p, h) in self.handlers.read().unwrap().iter() {
h.message(&NetworkContext::new(io, p, None, self.sessions.clone()), &message);
h.message(&NetworkContext::new(io, p, None, self.sessions.clone(), &reserved), &message);
}
}
}

View File

@ -14,8 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
//! Network and general IO module.
//!
//! Network and general IO module.
//!
//! Example usage for craeting a network service and adding an IO handler:
//!
//! ```rust
@ -112,3 +112,22 @@ pub trait NetworkProtocolHandler<Message>: Sync + Send where Message: Send + Syn
fn message(&self, _io: &NetworkContext<Message>, _message: &Message) {}
}
/// Non-reserved peer modes.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NonReservedPeerMode {
/// Accept them. This is the default.
Accept,
/// Deny them.
Deny,
}
impl NonReservedPeerMode {
/// Attempt to parse the peer mode from a string.
pub fn parse(s: &str) -> Option<Self> {
match s {
"accept" => Some(NonReservedPeerMode::Accept),
"deny" => Some(NonReservedPeerMode::Deny),
_ => None,
}
}
}

View File

@ -18,9 +18,9 @@ use std::sync::*;
use error::*;
use panics::*;
use network::{NetworkProtocolHandler, NetworkConfiguration};
use network::error::{NetworkError};
use network::error::NetworkError;
use network::host::{Host, NetworkIoMessage, ProtocolId};
use network::stats::{NetworkStats};
use network::stats::NetworkStats;
use io::*;
/// IO Service with networking
@ -111,6 +111,35 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
*host = None;
Ok(())
}
/// Try to add a reserved peer.
pub fn add_reserved_peer(&self, peer: &str) -> Result<(), UtilError> {
let host = self.host.read().unwrap();
if let Some(ref host) = *host {
host.add_reserved_node(peer)
} else {
Ok(())
}
}
/// Try to remove a reserved peer.
pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), UtilError> {
let host = self.host.read().unwrap();
if let Some(ref host) = *host {
host.remove_reserved_node(peer)
} else {
Ok(())
}
}
/// Set the non-reserved peer mode.
pub fn set_non_reserved_mode(&self, mode: ::network::NonReservedPeerMode) {
let host = self.host.read().unwrap();
if let Some(ref host) = *host {
let io_ctxt = IoContext::new(self.io_service.channel(), 0);
host.set_non_reserved_mode(mode, &io_ctxt);
}
}
}
impl<Message> MayPanic for NetworkService<Message> where Message: Send + Sync + Clone + 'static {