diff --git a/Cargo.lock b/Cargo.lock index dc97ce0ca..1f41ea021 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1103,7 +1103,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "rocksdb" version = "0.4.5" -source = "git+https://github.com/ethcore/rust-rocksdb#9140e37ce0fdb748097f85653c01b0f7e3736ea9" +source = "git+https://github.com/ethcore/rust-rocksdb#e0e6c099d8cd156fe446009fce241d57b00cd8f4" dependencies = [ "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "rocksdb-sys 0.3.0 (git+https://github.com/ethcore/rust-rocksdb)", @@ -1112,7 +1112,7 @@ dependencies = [ [[package]] name = "rocksdb-sys" version = "0.3.0" -source = "git+https://github.com/ethcore/rust-rocksdb#9140e37ce0fdb748097f85653c01b0f7e3736ea9" +source = "git+https://github.com/ethcore/rust-rocksdb#e0e6c099d8cd156fe446009fce241d57b00cd8f4" dependencies = [ "gcc 0.3.28 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/src/miner/transaction_queue.rs b/ethcore/src/miner/transaction_queue.rs index cc8430b42..06aa07880 100644 --- a/ethcore/src/miner/transaction_queue.rs +++ b/ethcore/src/miner/transaction_queue.rs @@ -686,14 +686,15 @@ impl TransactionQueue { if let Some(order) = self.future.drop(&address, &nonce) { // Let's insert that transaction to current (if it has higher gas_price) let future_tx = self.by_hash.remove(&order.hash).unwrap(); - try!(check_too_cheap(Self::replace_transaction(future_tx, state_nonce, &mut self.current, &mut self.by_hash))); + // if transaction in `current` (then one we are importing) is replaced it means that it has to low gas_price + try!(check_too_cheap(!Self::replace_transaction(future_tx, state_nonce, &mut self.current, &mut self.by_hash))); } // Also enforce the limit let removed = self.current.enforce_limit(&mut self.by_hash); // If some transaction were removed because of limit we need to update last_nonces also. self.update_last_nonces(&removed); - // Trigger error if we were removed. + // Trigger error if the transaction we are importing was removed. try!(check_if_removed(&address, &nonce, removed)); trace!(target: "miner", "status: {:?}", self.status()); @@ -937,7 +938,7 @@ mod test { let res = txq.add(tx2.clone(), &default_nonce, TransactionOrigin::External); // and then there should be only one transaction in current (the one with higher gas_price) - assert_eq!(unwrap_tx_err(res), TransactionError::TooCheapToReplace); + assert_eq!(res.unwrap(), TransactionImportResult::Current); assert_eq!(txq.status().pending, 1); assert_eq!(txq.status().future, 0); assert_eq!(txq.current.by_priority.len(), 1); diff --git a/ethstore/src/ethkey.rs b/ethstore/src/ethkey.rs index eba877397..9d8858b79 100644 --- a/ethstore/src/ethkey.rs +++ b/ethstore/src/ethkey.rs @@ -31,3 +31,11 @@ impl From for Address { From::from(a) } } + +impl<'a> From<&'a json::H160> for Address { + fn from(json: &'a json::H160) -> Self { + let mut a = [0u8; 20]; + a.copy_from_slice(json); + From::from(a) + } +} diff --git a/parity/configuration.rs b/parity/configuration.rs index 3f8202021..e6e6e36a1 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -200,7 +200,10 @@ impl Configuration { net_path.push("network"); ret.config_path = Some(net_path.to_str().unwrap().to_owned()); ret.reserved_nodes = self.init_reserved_nodes(); - ret.reserved_only = self.args.flag_reserved_only; + + if self.args.flag_reserved_only { + ret.non_reserved_mode = ::util::network::NonReservedPeerMode::Deny; + } ret } diff --git a/parity/main.rs b/parity/main.rs index dc5457aa1..a44cdf6d3 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -230,6 +230,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) logger: logger.clone(), settings: network_settings.clone(), allow_pending_receipt_query: !conf.args.flag_geth, + net_service: service.network(), }); let dependencies = rpc::Dependencies { @@ -315,11 +316,11 @@ fn execute_export(conf: Configuration) { udp_port: None, nat_enabled: false, discovery_enabled: false, - reserved_only: true, boot_nodes: Vec::new(), use_secret: None, ideal_peers: 0, reserved_nodes: Vec::new(), + non_reserved_mode: ::util::network::NonReservedPeerMode::Accept, }; let client_config = conf.client_config(&spec); @@ -387,11 +388,11 @@ fn execute_import(conf: Configuration) { udp_port: None, nat_enabled: false, discovery_enabled: false, - reserved_only: true, boot_nodes: Vec::new(), use_secret: None, ideal_peers: 0, reserved_nodes: Vec::new(), + non_reserved_mode: ::util::network::NonReservedPeerMode::Accept, }; let client_config = conf.client_config(&spec); diff --git a/parity/migration.rs b/parity/migration.rs index 9f1c4c52e..4198f7ed0 100644 --- a/parity/migration.rs +++ b/parity/migration.rs @@ -89,6 +89,7 @@ fn current_version(path: &PathBuf) -> Result { /// Writes current database version to the file. /// Creates a new file if the version file does not exist yet. fn update_version(path: &PathBuf) -> Result<(), Error> { + try!(fs::create_dir_all(path)); let mut file = try!(File::create(version_file_path(path))); try!(file.write_all(format!("{}", CURRENT_VERSION).as_bytes())); Ok(()) diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index bf21181a1..251c24d85 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -25,6 +25,7 @@ use ethcore::client::Client; use util::RotatingLogger; use ethcore::account_provider::AccountProvider; use util::network_settings::NetworkSettings; +use util::network::NetworkService; #[cfg(feature="rpc")] pub use ethcore_rpc::ConfirmationsQueue; @@ -89,6 +90,7 @@ pub struct Dependencies { pub logger: Arc, pub settings: Arc, pub allow_pending_receipt_query: bool, + pub net_service: Arc>, } fn to_modules(apis: &[Api]) -> BTreeMap { @@ -163,7 +165,7 @@ pub fn setup_rpc(server: T, deps: Arc, apis: ApiSet server.add_delegate(EthcoreClient::new(&deps.client, &deps.miner, deps.logger.clone(), deps.settings.clone()).to_delegate()) }, Api::EthcoreSet => { - server.add_delegate(EthcoreSetClient::new(&deps.miner).to_delegate()) + server.add_delegate(EthcoreSetClient::new(&deps.miner, &deps.net_service).to_delegate()) }, Api::Traces => { server.add_delegate(TracesClient::new(&deps.client, &deps.miner).to_delegate()) diff --git a/rpc/src/v1/impls/ethcore_set.rs b/rpc/src/v1/impls/ethcore_set.rs index cbd9c4309..66f1a34aa 100644 --- a/rpc/src/v1/impls/ethcore_set.rs +++ b/rpc/src/v1/impls/ethcore_set.rs @@ -16,9 +16,11 @@ /// Ethcore-specific rpc interface for operations altering the settings. use util::{U256, Address}; +use util::network::{NetworkService, NonReservedPeerMode}; use std::sync::{Arc, Weak}; use jsonrpc_core::*; use ethcore::miner::MinerService; +use ethcore::service::SyncMessage; use v1::traits::EthcoreSet; use v1::types::{Bytes}; @@ -27,13 +29,15 @@ pub struct EthcoreSetClient where M: MinerService { miner: Weak, + net: Weak>, } impl EthcoreSetClient where M: MinerService { /// Creates new `EthcoreSetClient`. - pub fn new(miner: &Arc) -> Self { + pub fn new(miner: &Arc, net: &Arc>) -> Self { EthcoreSetClient { miner: Arc::downgrade(miner), + net: Arc::downgrade(net), } } } @@ -74,4 +78,32 @@ impl EthcoreSet for EthcoreSetClient where M: MinerService + 'static { to_value(&true) }) } + + fn add_reserved_peer(&self, params: Params) -> Result { + from_params::<(String,)>(params).and_then(|(peer,)| { + match take_weak!(self.net).add_reserved_peer(&peer) { + Ok(()) => to_value(&true), + Err(_) => Err(Error::invalid_params()), + } + }) + } + + fn remove_reserved_peer(&self, params: Params) -> Result { + from_params::<(String,)>(params).and_then(|(peer,)| { + match take_weak!(self.net).remove_reserved_peer(&peer) { + Ok(()) => to_value(&true), + Err(_) => Err(Error::invalid_params()), + } + }) + } + + fn drop_non_reserved_peers(&self, _: Params) -> Result { + take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Deny); + to_value(&true) + } + + fn accept_non_reserved_peers(&self, _: Params) -> Result { + take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Accept); + to_value(&true) + } } diff --git a/rpc/src/v1/tests/mocked/ethcore.rs b/rpc/src/v1/tests/mocked/ethcore.rs index 68c33ecce..a096543e3 100644 --- a/rpc/src/v1/tests/mocked/ethcore.rs +++ b/rpc/src/v1/tests/mocked/ethcore.rs @@ -19,11 +19,13 @@ use std::str::FromStr; use jsonrpc_core::IoHandler; use v1::{Ethcore, EthcoreClient, EthcoreSet, EthcoreSetClient}; use ethcore::miner::MinerService; +use ethcore::service::SyncMessage; use v1::tests::helpers::TestMinerService; use ethcore::client::{TestBlockChainClient}; use util::numbers::*; use rustc_serialize::hex::FromHex; use util::log::RotatingLogger; +use util::network::{NetworkConfiguration, NetworkService}; use util::network_settings::NetworkSettings; fn miner_service() -> Arc { @@ -50,21 +52,26 @@ fn settings() -> Arc { }) } +fn network_service() -> Arc> { + Arc::new(NetworkService::new(NetworkConfiguration::new()).unwrap()) +} + fn ethcore_client(client: &Arc, miner: &Arc) -> EthcoreClient { EthcoreClient::new(client, miner, logger(), settings()) } -fn ethcore_set_client(miner: &Arc) -> EthcoreSetClient { - EthcoreSetClient::new(miner) +fn ethcore_set_client(miner: &Arc, net: &Arc>) -> EthcoreSetClient { + EthcoreSetClient::new(miner, net) } #[test] fn rpc_ethcore_extra_data() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_extraData", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x01020304","id":1}"#; @@ -79,9 +86,10 @@ fn rpc_ethcore_default_extra_data() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_defaultExtraData", "params": [], "id": 1}"#; let response = format!(r#"{{"jsonrpc":"2.0","result":"0x{}","id":1}}"#, misc::version_data().to_hex()); @@ -93,9 +101,10 @@ fn rpc_ethcore_default_extra_data() { fn rpc_ethcore_gas_floor_target() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_gasFloorTarget", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x3039","id":1}"#; @@ -107,9 +116,10 @@ fn rpc_ethcore_gas_floor_target() { fn rpc_ethcore_min_gas_price() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_minGasPrice", "params": [], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"0x01312d00","id":1}"#; @@ -121,9 +131,10 @@ fn rpc_ethcore_min_gas_price() { fn rpc_ethcore_set_min_gas_price() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setMinGasPrice", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -136,9 +147,10 @@ fn rpc_ethcore_set_min_gas_price() { fn rpc_ethcore_set_gas_floor_target() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setGasFloorTarget", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -151,9 +163,10 @@ fn rpc_ethcore_set_gas_floor_target() { fn rpc_ethcore_set_extra_data() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setExtraData", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -166,9 +179,10 @@ fn rpc_ethcore_set_extra_data() { fn rpc_ethcore_set_author() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setAuthor", "params":["0xcd1722f3947def4cf144679da39c4c32bdc35681"], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -181,13 +195,14 @@ fn rpc_ethcore_set_author() { fn rpc_ethcore_dev_logs() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let logger = logger(); logger.append("a".to_owned()); logger.append("b".to_owned()); let ethcore = EthcoreClient::new(&client, &miner, logger.clone(), settings()).to_delegate(); let io = IoHandler::new(); io.add_delegate(ethcore); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogs", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":["b","a"],"id":1}"#; @@ -199,9 +214,10 @@ fn rpc_ethcore_dev_logs() { fn rpc_ethcore_dev_logs_levels() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_devLogsLevels", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"rpc=trace","id":1}"#; @@ -212,9 +228,10 @@ fn rpc_ethcore_dev_logs_levels() { fn rpc_ethcore_set_transactions_limit() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_setTransactionsLimit", "params":[10240240], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; @@ -227,9 +244,10 @@ fn rpc_ethcore_set_transactions_limit() { fn rpc_ethcore_transactions_limit() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_transactionsLimit", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":1024,"id":1}"#; @@ -241,9 +259,10 @@ fn rpc_ethcore_transactions_limit() { fn rpc_ethcore_net_chain() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netChain", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"testchain","id":1}"#; @@ -255,9 +274,10 @@ fn rpc_ethcore_net_chain() { fn rpc_ethcore_net_max_peers() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netMaxPeers", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":25,"id":1}"#; @@ -269,9 +289,10 @@ fn rpc_ethcore_net_max_peers() { fn rpc_ethcore_net_port() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_netPort", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":30303,"id":1}"#; @@ -283,9 +304,10 @@ fn rpc_ethcore_net_port() { fn rpc_ethcore_rpc_settings() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_rpcSettings", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":{"enabled":true,"interface":"all","port":8545},"id":1}"#; @@ -297,9 +319,10 @@ fn rpc_ethcore_rpc_settings() { fn rpc_ethcore_node_name() { let miner = miner_service(); let client = client_service(); + let network = network_service(); let io = IoHandler::new(); io.add_delegate(ethcore_client(&client, &miner).to_delegate()); - io.add_delegate(ethcore_set_client(&miner).to_delegate()); + io.add_delegate(ethcore_set_client(&miner, &network).to_delegate()); let request = r#"{"jsonrpc": "2.0", "method": "ethcore_nodeName", "params":[], "id": 1}"#; let response = r#"{"jsonrpc":"2.0","result":"mynode","id":1}"#; diff --git a/rpc/src/v1/traits/ethcore_set.rs b/rpc/src/v1/traits/ethcore_set.rs index 332c505b6..ed4303be1 100644 --- a/rpc/src/v1/traits/ethcore_set.rs +++ b/rpc/src/v1/traits/ethcore_set.rs @@ -37,6 +37,18 @@ pub trait EthcoreSet: Sized + Send + Sync + 'static { /// Sets the limits for transaction queue. fn set_transactions_limit(&self, _: Params) -> Result; + /// Add a reserved peer. + fn add_reserved_peer(&self, _: Params) -> Result; + + /// Remove a reserved peer. + fn remove_reserved_peer(&self, _: Params) -> Result; + + /// Drop all non-reserved peers. + fn drop_non_reserved_peers(&self, _: Params) -> Result; + + /// Accept non-reserved peers (default behavior) + fn accept_non_reserved_peers(&self, _: Params) -> Result; + /// Should be used to convert object to io delegate. fn to_delegate(self) -> IoDelegate { let mut delegate = IoDelegate::new(Arc::new(self)); @@ -45,6 +57,10 @@ pub trait EthcoreSet: Sized + Send + Sync + 'static { delegate.add_method("ethcore_setExtraData", EthcoreSet::set_extra_data); delegate.add_method("ethcore_setAuthor", EthcoreSet::set_author); delegate.add_method("ethcore_setTransactionsLimit", EthcoreSet::set_transactions_limit); + delegate.add_method("ethcore_addReservedPeer", EthcoreSet::add_reserved_peer); + delegate.add_method("ethcore_removeReservedPeer", EthcoreSet::remove_reserved_peer); + delegate.add_method("ethcore_dropNonReservedPeers", EthcoreSet::drop_non_reserved_peers); + delegate.add_method("ethcore_acceptNonReservedPeers", EthcoreSet::accept_non_reserved_peers); delegate } diff --git a/util/src/kvdb.rs b/util/src/kvdb.rs index 7d581fdbc..0911b3471 100644 --- a/util/src/kvdb.rs +++ b/util/src/kvdb.rs @@ -20,6 +20,9 @@ use std::default::Default; use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBVector, DBIterator, IndexType, Options, DBCompactionStyle, BlockBasedOptions, Direction}; +const DB_FILE_SIZE_BASE: u64 = 10 * 1024 * 1024; +const DB_FILE_SIZE_MULTIPLIER: i32 = 5; + /// Write transaction. Batches a sequence of put/delete operations for efficiency. pub struct DBTransaction { batch: WriteBatch, @@ -64,7 +67,7 @@ impl DatabaseConfig { DatabaseConfig { cache_size: Some(cache_size), prefix_size: None, - max_open_files: 256 + max_open_files: -1, } } } @@ -74,7 +77,7 @@ impl Default for DatabaseConfig { DatabaseConfig { cache_size: None, prefix_size: None, - max_open_files: 256 + max_open_files: -1, } } } @@ -110,6 +113,8 @@ impl Database { opts.create_if_missing(true); opts.set_use_fsync(false); opts.set_compaction_style(DBCompactionStyle::DBUniversalCompaction); + opts.set_target_file_size_base(DB_FILE_SIZE_BASE); + opts.set_target_file_size_multiplier(DB_FILE_SIZE_MULTIPLIER); if let Some(cache_size) = config.cache_size { // half goes to read cache opts.set_block_cache_size_mb(cache_size as u64 / 2); diff --git a/util/src/network/host.rs b/util/src/network/host.rs index aacfc3fcb..03ba07544 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -14,9 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use std::net::{SocketAddr}; -use std::collections::{HashMap}; -use std::str::{FromStr}; +use std::net::SocketAddr; +use std::collections::{HashMap, HashSet}; +use std::str::FromStr; use std::sync::*; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::ops::*; @@ -35,7 +35,7 @@ use rlp::*; use network::session::{Session, SessionData}; use error::*; use io::*; -use network::{NetworkProtocolHandler, PROTOCOL_VERSION}; +use network::{NetworkProtocolHandler, NonReservedPeerMode, PROTOCOL_VERSION}; use network::node_table::*; use network::stats::NetworkStats; use network::error::{NetworkError, DisconnectReason}; @@ -65,8 +65,6 @@ pub struct NetworkConfiguration { pub nat_enabled: bool, /// Enable discovery pub discovery_enabled: bool, - /// Pin to reserved nodes only - pub reserved_only: bool, /// List of initial node addresses pub boot_nodes: Vec, /// Use provided node key instead of default @@ -75,6 +73,8 @@ pub struct NetworkConfiguration { pub ideal_peers: u32, /// List of reserved node addresses. pub reserved_nodes: Vec, + /// The non-reserved peer mode. + pub non_reserved_mode: NonReservedPeerMode, } impl Default for NetworkConfiguration { @@ -93,11 +93,11 @@ impl NetworkConfiguration { udp_port: None, nat_enabled: true, discovery_enabled: true, - reserved_only: false, boot_nodes: Vec::new(), use_secret: None, ideal_peers: 25, reserved_nodes: Vec::new(), + non_reserved_mode: NonReservedPeerMode::Accept, } } @@ -191,13 +191,15 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta sessions: Arc>>, session: Option, session_id: Option, + reserved_peers: &'s HashSet, } impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, { /// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler. fn new(io: &'s IoContext>, protocol: ProtocolId, - session: Option, sessions: Arc>>) -> NetworkContext<'s, Message> { + session: Option, sessions: Arc>>, + reserved_peers: &'s HashSet) -> NetworkContext<'s, Message> { let id = session.as_ref().map(|s| s.lock().unwrap().token()); NetworkContext { io: io, @@ -205,6 +207,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone session_id: id, session: session, sessions: sessions, + reserved_peers: reserved_peers, } } @@ -237,7 +240,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone self.io.message(NetworkIoMessage::User(msg)); } - /// Send an IO message + /// Get an IoChannel. pub fn io_channel(&self) -> IoChannel> { self.io.channel() } @@ -335,7 +338,7 @@ pub struct Host where Message: Send + Sync + Clone { timers: RwLock>, timer_counter: RwLock, stats: Arc, - pinned_nodes: Vec, + reserved_nodes: RwLock>, num_sessions: AtomicUsize, stopping: AtomicBool, } @@ -390,28 +393,28 @@ impl Host where Message: Send + Sync + Clone { timers: RwLock::new(HashMap::new()), timer_counter: RwLock::new(USER_TIMER), stats: stats, - pinned_nodes: Vec::new(), + reserved_nodes: RwLock::new(HashSet::new()), num_sessions: AtomicUsize::new(0), stopping: AtomicBool::new(false), }; for n in boot_nodes { - // don't pin boot nodes. - host.add_node(&n, false); + host.add_node(&n); } for n in reserved_nodes { - host.add_node(&n, true); + if let Err(e) = host.add_reserved_node(&n) { + debug!(target: "network", "Error parsing node id: {}: {:?}", n, e); + } } Ok(host) } - pub fn add_node(&mut self, id: &str, pin: bool) { + pub fn add_node(&mut self, id: &str) { match Node::from_str(id) { Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); }, Ok(n) => { let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; - if pin { self.pinned_nodes.push(n.id.clone()) } self.nodes.write().unwrap().add_node(n); if let Some(ref mut discovery) = *self.discovery.lock().unwrap() { @@ -421,6 +424,56 @@ impl Host where Message: Send + Sync + Clone { } } + pub fn add_reserved_node(&self, id: &str) -> Result<(), UtilError> { + let n = try!(Node::from_str(id)); + + let entry = NodeEntry { endpoint: n.endpoint.clone(), id: n.id.clone() }; + self.reserved_nodes.write().unwrap().insert(n.id.clone()); + + if let Some(ref mut discovery) = *self.discovery.lock().unwrap() { + discovery.add_node(entry); + } + + Ok(()) + } + + pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext>) { + let mut info = self.info.write().unwrap(); + + if info.config.non_reserved_mode != mode { + info.config.non_reserved_mode = mode.clone(); + drop(info); + if let NonReservedPeerMode::Deny = mode { + // disconnect all non-reserved peers here. + let reserved: HashSet = self.reserved_nodes.read().unwrap().clone(); + let mut to_kill = Vec::new(); + for e in self.sessions.write().unwrap().iter_mut() { + let mut s = e.lock().unwrap(); + { + let id = s.id(); + if id.is_some() && reserved.contains(id.unwrap()) { + continue; + } + } + + s.disconnect(io, DisconnectReason::ClientQuit); + to_kill.push(s.token()); + } + for p in to_kill { + trace!(target: "network", "Disconnecting on reserved-only mode: {}", p); + self.kill_connection(p, io, false); + } + } + } + } + + pub fn remove_reserved_node(&self, id: &str) -> Result<(), UtilError> { + let n = try!(Node::from_str(id)); + self.reserved_nodes.write().unwrap().remove(&n.id); + + Ok(()) + } + pub fn client_version() -> String { version() } @@ -483,7 +536,7 @@ impl Host where Message: Send + Sync + Clone { // Initialize discovery. let discovery = { let info = self.info.read().unwrap(); - if info.config.discovery_enabled && !info.config.reserved_only { + if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept { Some(Discovery::new(&info.keys, public_endpoint.address.clone(), public_endpoint, DISCOVERY)) } else { None } }; @@ -540,17 +593,26 @@ impl Host where Message: Send + Sync + Clone { } fn connect_peers(&self, io: &IoContext>) { - if self.info.read().unwrap().capabilities.is_empty() { - return; - } - let ideal_peers = { self.info.read().unwrap().config.ideal_peers }; - let pin = { self.info.read().unwrap().config.reserved_only }; - let session_count = self.session_count(); - if session_count >= ideal_peers as usize + self.pinned_nodes.len() { - // check if all pinned nodes are connected. - if self.pinned_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) { + let (ideal_peers, mut pin) = { + let info = self.info.read().unwrap(); + if info.capabilities.is_empty() { return; } + let config = &info.config; + + (config.ideal_peers, config.non_reserved_mode == NonReservedPeerMode::Deny) + }; + + let session_count = self.session_count(); + let reserved_nodes = self.reserved_nodes.read().unwrap(); + if session_count >= ideal_peers as usize + reserved_nodes.len() { + // check if all pinned nodes are connected. + if reserved_nodes.iter().all(|n| self.have_session(n) && self.connecting_to(n)) { + return; + } + + // if not, only attempt connect to reserved peers + pin = true; } let handshake_count = self.handshake_count(); @@ -562,7 +624,7 @@ impl Host where Message: Send + Sync + Clone { // iterate over all nodes, reserved ones coming first. // if we are pinned to only reserved nodes, ignore all others. - let nodes = self.pinned_nodes.iter().cloned().chain(if !pin { + let nodes = reserved_nodes.iter().cloned().chain(if !pin { self.nodes.read().unwrap().nodes() } else { Vec::new() @@ -696,11 +758,20 @@ impl Host where Message: Send + Sync + Clone { self.num_sessions.fetch_add(1, AtomicOrdering::SeqCst); if !s.info.originated { let session_count = self.session_count(); - let ideal_peers = { self.info.read().unwrap().config.ideal_peers }; - if session_count >= ideal_peers as usize { - s.disconnect(io, DisconnectReason::TooManyPeers); - return; + let reserved_nodes = self.reserved_nodes.read().unwrap(); + let (ideal_peers, reserved_only) = { + let info = self.info.read().unwrap(); + (info.config.ideal_peers, info.config.non_reserved_mode == NonReservedPeerMode::Deny) + }; + + if session_count >= ideal_peers as usize || reserved_only { + // only proceed if the connecting peer is reserved. + if !reserved_nodes.contains(s.id().unwrap()) { + s.disconnect(io, DisconnectReason::TooManyPeers); + return; + } } + // Add it no node table if let Ok(address) = s.remote_addr() { let entry = NodeEntry { id: s.id().unwrap().clone(), endpoint: NodeEndpoint { address: address, udp_port: address.port() } }; @@ -735,14 +806,17 @@ impl Host where Message: Send + Sync + Clone { if kill { self.kill_connection(token, io, true); } + let handlers = self.handlers.read().unwrap(); for p in ready_data { - let h = self.handlers.read().unwrap().get(p).unwrap().clone(); + let h = handlers.get(p).unwrap().clone(); self.stats.inc_sessions(); - h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token); + let reserved = self.reserved_nodes.read().unwrap(); + h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token); } for (p, packet_id, data) in packet_data { - let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token, packet_id, &data[1..]); + let h = handlers.get(p).unwrap().clone(); + let reserved = self.reserved_nodes.read().unwrap(); + h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone(), &reserved), &token, packet_id, &data[1..]); } } @@ -783,7 +857,8 @@ impl Host where Message: Send + Sync + Clone { } for p in to_disconnect { let h = self.handlers.read().unwrap().get(p).unwrap().clone(); - h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone()), &token); + let reserved = self.reserved_nodes.read().unwrap(); + h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token); } if deregister { io.deregister_stream(token).unwrap_or_else(|e| debug!("Error deregistering stream: {:?}", e)); @@ -886,7 +961,10 @@ impl IoHandler> for Host where Messa _ => match self.timers.read().unwrap().get(&token).cloned() { Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() { None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) }, - Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); } + Some(h) => { + let reserved = self.reserved_nodes.read().unwrap(); + h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone(), &reserved), timer.token); + } }, None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us } @@ -904,7 +982,8 @@ impl IoHandler> for Host where Messa ref versions } => { let h = handler.clone(); - h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone())); + let reserved = self.reserved_nodes.read().unwrap(); + h.initialize(&NetworkContext::new(io, protocol, None, self.sessions.clone(), &reserved)); self.handlers.write().unwrap().insert(protocol, h); let mut info = self.info.write().unwrap(); for v in versions { @@ -946,8 +1025,9 @@ impl IoHandler> for Host where Messa self.kill_connection(*peer, io, false); }, NetworkIoMessage::User(ref message) => { + let reserved = self.reserved_nodes.read().unwrap(); for (p, h) in self.handlers.read().unwrap().iter() { - h.message(&NetworkContext::new(io, p, None, self.sessions.clone()), &message); + h.message(&NetworkContext::new(io, p, None, self.sessions.clone(), &reserved), &message); } } } diff --git a/util/src/network/mod.rs b/util/src/network/mod.rs index d074e6631..d59ab63b1 100644 --- a/util/src/network/mod.rs +++ b/util/src/network/mod.rs @@ -14,8 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -//! Network and general IO module. -//! +//! Network and general IO module. +//! //! Example usage for craeting a network service and adding an IO handler: //! //! ```rust @@ -112,3 +112,22 @@ pub trait NetworkProtocolHandler: Sync + Send where Message: Send + Syn fn message(&self, _io: &NetworkContext, _message: &Message) {} } +/// Non-reserved peer modes. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum NonReservedPeerMode { + /// Accept them. This is the default. + Accept, + /// Deny them. + Deny, +} + +impl NonReservedPeerMode { + /// Attempt to parse the peer mode from a string. + pub fn parse(s: &str) -> Option { + match s { + "accept" => Some(NonReservedPeerMode::Accept), + "deny" => Some(NonReservedPeerMode::Deny), + _ => None, + } + } +} \ No newline at end of file diff --git a/util/src/network/service.rs b/util/src/network/service.rs index e8db354d4..353a24bbe 100644 --- a/util/src/network/service.rs +++ b/util/src/network/service.rs @@ -18,9 +18,9 @@ use std::sync::*; use error::*; use panics::*; use network::{NetworkProtocolHandler, NetworkConfiguration}; -use network::error::{NetworkError}; +use network::error::NetworkError; use network::host::{Host, NetworkIoMessage, ProtocolId}; -use network::stats::{NetworkStats}; +use network::stats::NetworkStats; use io::*; /// IO Service with networking @@ -111,6 +111,35 @@ impl NetworkService where Message: Send + Sync + Clone + 'stat *host = None; Ok(()) } + + /// Try to add a reserved peer. + pub fn add_reserved_peer(&self, peer: &str) -> Result<(), UtilError> { + let host = self.host.read().unwrap(); + if let Some(ref host) = *host { + host.add_reserved_node(peer) + } else { + Ok(()) + } + } + + /// Try to remove a reserved peer. + pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), UtilError> { + let host = self.host.read().unwrap(); + if let Some(ref host) = *host { + host.remove_reserved_node(peer) + } else { + Ok(()) + } + } + + /// Set the non-reserved peer mode. + pub fn set_non_reserved_mode(&self, mode: ::network::NonReservedPeerMode) { + let host = self.host.read().unwrap(); + if let Some(ref host) = *host { + let io_ctxt = IoContext::new(self.io_service.channel(), 0); + host.set_non_reserved_mode(mode, &io_ctxt); + } + } } impl MayPanic for NetworkService where Message: Send + Sync + Clone + 'static {