Merge branch 'master' into csp-fix

This commit is contained in:
Tomasz Drwięga
2017-07-11 13:45:23 +02:00
118 changed files with 3716 additions and 667 deletions

View File

@@ -71,7 +71,7 @@ pub fn execute(cmd: AccountCmd) -> Result<String, String> {
}
fn keys_dir(path: String, spec: SpecType) -> Result<RootDiskDirectory, String> {
let spec = spec.spec()?;
let spec = spec.spec(&::std::env::temp_dir())?;
let mut path = PathBuf::from(&path);
path.push(spec.data_dir);
RootDiskDirectory::create(path).map_err(|e| format!("Could not open keys directory: {}", e))

View File

@@ -29,7 +29,7 @@ use ethcore::error::ImportError;
use ethcore::miner::Miner;
use ethcore::verification::queue::VerifierSettings;
use cache::CacheConfig;
use informant::{Informant, MillisecondDuration};
use informant::{Informant, FullNodeInformantData, MillisecondDuration};
use params::{SpecType, Pruning, Switch, tracing_switch_to_bool, fatdb_switch_to_bool};
use helpers::{to_client_config, execute_upgrades};
use dir::Directories;
@@ -148,7 +148,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> {
let timer = Instant::now();
// load spec file
let spec = cmd.spec.spec()?;
let spec = cmd.spec.spec(&cmd.dirs.cache)?;
// load genesis hash
let genesis_hash = spec.genesis_header().hash();
@@ -238,7 +238,17 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> {
}
};
let informant = Arc::new(Informant::new(client.clone(), None, None, None, None, cmd.with_color));
let informant = Arc::new(Informant::new(
FullNodeInformantData {
client: client.clone(),
sync: None,
net: None,
},
None,
None,
cmd.with_color,
));
service.register_io_handler(informant).map_err(|_| "Unable to register informant handler".to_owned())?;
let do_import = |bytes| {
@@ -320,7 +330,7 @@ fn start_client(
) -> Result<ClientService, String> {
// load spec file
let spec = spec.spec()?;
let spec = spec.spec(&dirs.cache)?;
// load genesis hash
let genesis_hash = spec.genesis_header().hash();
@@ -517,7 +527,7 @@ fn execute_export_state(cmd: ExportState) -> Result<(), String> {
}
pub fn kill_db(cmd: KillBlockchain) -> Result<(), String> {
let spec = cmd.spec.spec()?;
let spec = cmd.spec.spec(&cmd.dirs.cache)?;
let genesis_hash = spec.genesis_header().hash();
let db_dirs = cmd.dirs.database(genesis_hash, None, spec.data_dir);
let user_defaults_path = db_dirs.user_defaults_path();

View File

@@ -78,6 +78,7 @@ disable_periodic = true
jit = false
[misc]
ntp_server = "pool.ntp.org:123"
logging = "own_tx=trace"
log_file = "/var/log/parity.log"
color = true

View File

@@ -180,8 +180,10 @@ usage! {
or |c: &Config| otry!(c.rpc).apis.as_ref().map(|vec| vec.join(",")),
flag_jsonrpc_hosts: String = "none",
or |c: &Config| otry!(c.rpc).hosts.as_ref().map(|vec| vec.join(",")),
flag_jsonrpc_threads: Option<usize> = None,
or |c: &Config| otry!(c.rpc).threads.map(Some),
flag_jsonrpc_server_threads: Option<usize> = None,
or |c: &Config| otry!(c.rpc).server_threads.map(Some),
flag_jsonrpc_threads: usize = 0usize,
or |c: &Config| otry!(c.rpc).processing_threads,
// WS
flag_no_ws: bool = false,
@@ -250,6 +252,8 @@ usage! {
or |c: &Config| otry!(c.mining).force_sealing.clone(),
flag_reseal_on_txs: String = "own",
or |c: &Config| otry!(c.mining).reseal_on_txs.clone(),
flag_reseal_on_uncle: bool = false,
or |c: &Config| otry!(c.mining).reseal_on_uncle.clone(),
flag_reseal_min_period: u64 = 2000u64,
or |c: &Config| otry!(c.mining).reseal_min_period.clone(),
flag_reseal_max_period: u64 = 120000u64,
@@ -350,6 +354,8 @@ usage! {
or |c: &Config| otry!(c.vm).jit.clone(),
// -- Miscellaneous Options
flag_ntp_server: String = "pool.ntp.org:123",
or |c: &Config| otry!(c.misc).ntp_server.clone(),
flag_logging: Option<String> = None,
or |c: &Config| otry!(c.misc).logging.clone().map(Some),
flag_log_file: Option<String> = None,
@@ -466,7 +472,8 @@ struct Rpc {
cors: Option<String>,
apis: Option<Vec<String>>,
hosts: Option<Vec<String>>,
threads: Option<usize>,
server_threads: Option<usize>,
processing_threads: Option<usize>,
}
#[derive(Default, Debug, PartialEq, Deserialize)]
@@ -524,6 +531,7 @@ struct Mining {
author: Option<String>,
engine_signer: Option<String>,
force_sealing: Option<bool>,
reseal_on_uncle: Option<bool>,
reseal_on_txs: Option<String>,
reseal_min_period: Option<u64>,
reseal_max_period: Option<u64>,
@@ -584,6 +592,7 @@ struct VM {
#[derive(Default, Debug, PartialEq, Deserialize)]
struct Misc {
ntp_server: Option<String>,
logging: Option<String>,
log_file: Option<String>,
color: Option<bool>,
@@ -746,7 +755,8 @@ mod tests {
flag_jsonrpc_cors: Some("null".into()),
flag_jsonrpc_apis: "web3,eth,net,parity,traces,rpc,secretstore".into(),
flag_jsonrpc_hosts: "none".into(),
flag_jsonrpc_threads: None,
flag_jsonrpc_server_threads: None,
flag_jsonrpc_threads: 0,
// WS
flag_no_ws: false,
@@ -788,6 +798,7 @@ mod tests {
flag_reseal_on_txs: "all".into(),
flag_reseal_min_period: 4000u64,
flag_reseal_max_period: 60000u64,
flag_reseal_on_uncle: false,
flag_work_queue_size: 20usize,
flag_tx_gas_limit: Some("6283184".into()),
flag_tx_time_limit: Some(100u64),
@@ -882,6 +893,7 @@ mod tests {
flag_dapps_apis_all: None,
// -- Miscellaneous Options
flag_ntp_server: "pool.ntp.org:123".into(),
flag_version: false,
flag_logging: Some("own_tx=trace".into()),
flag_log_file: Some("/var/log/parity.log".into()),
@@ -973,7 +985,8 @@ mod tests {
cors: None,
apis: None,
hosts: None,
threads: None,
server_threads: None,
processing_threads: None,
}),
ipc: Some(Ipc {
disable: None,
@@ -1012,6 +1025,7 @@ mod tests {
engine_signer: Some("0xdeadbeefcafe0000000000000000000000000001".into()),
force_sealing: Some(true),
reseal_on_txs: Some("all".into()),
reseal_on_uncle: None,
reseal_min_period: Some(4000),
reseal_max_period: Some(60000),
work_queue_size: None,
@@ -1056,6 +1070,7 @@ mod tests {
jit: Some(false),
}),
misc: Some(Misc {
ntp_server: Some("pool.ntp.org:123".into()),
logging: Some("own_tx=trace".into()),
log_file: Some("/var/log/parity.log".into()),
color: Some(true),

View File

@@ -176,9 +176,12 @@ API and Console Options:
is additional security against some attack
vectors. Special options: "all", "none",
(default: {flag_jsonrpc_hosts}).
--jsonrpc-threads THREADS Enables experimental faster implementation of JSON-RPC server.
--jsonrpc-server-threads NUM Enables experimental faster implementation of JSON-RPC server.
Requires Dapps server to be disabled
using --no-dapps. (default: {flag_jsonrpc_threads:?})
using --no-dapps. (default: {flag_jsonrpc_server_threads:?})
--jsonrpc-threads THREADS Turn on additional processing threads in all RPC servers.
Setting this to non-zero value allows parallel cpu-heavy queries
execution. (default: {flag_jsonrpc_threads})
--no-ws Disable the WebSockets server. (default: {flag_no_ws})
--ws-port PORT Specify the port portion of the WebSockets server
@@ -262,6 +265,9 @@ Sealing/Mining Options:
ext - reseal only on a new external transaction;
all - reseal on all new transactions
(default: {flag_reseal_on_txs}).
--reseal-on-uncle Force the node to author new blocks when a new uncle
block is imported.
(default: {flag_reseal_on_uncle})
--reseal-min-period MS Specify the minimum time between reseals from
incoming transactions. MS is time measured in
milliseconds (default: {flag_reseal_min_period}).
@@ -461,6 +467,8 @@ Internal Options:
--can-restart Executable will auto-restart if exiting with 69.
Miscellaneous Options:
--ntp-server HOST NTP server to provide current time (host:port). Used to verify node health.
(default: {flag_ntp_server})
-l --logging LOGGING Specify the logging level. Must conform to the same
format as RUST_LOG. (default: {flag_logging:?})
--log-file FILENAME Specify a filename into which logging should be

View File

@@ -34,7 +34,7 @@ use rpc::{IpcConfiguration, HttpConfiguration, WsConfiguration, UiConfiguration}
use rpc_apis::ApiSet;
use parity_rpc::NetworkSettings;
use cache::CacheConfig;
use helpers::{to_duration, to_mode, to_block_id, to_u256, to_pending_set, to_price, replace_home, replace_home_for_db,
use helpers::{to_duration, to_mode, to_block_id, to_u256, to_pending_set, to_price, replace_home, replace_home_and_local,
geth_ipc_path, parity_ipc_path, to_bootnodes, to_addresses, to_address, to_gas_limit, to_queue_strategy};
use params::{SpecType, ResealPolicy, AccountsConfig, GasPricerConfig, MinerExtras, Pruning, Switch};
use ethcore_logger::Config as LogConfig;
@@ -137,7 +137,7 @@ impl Configuration {
let secretstore_conf = self.secretstore_config()?;
let format = self.format()?;
if self.args.flag_jsonrpc_threads.is_some() && dapps_conf.enabled {
if self.args.flag_jsonrpc_server_threads.is_some() && dapps_conf.enabled {
dapps_conf.enabled = false;
writeln!(&mut stderr(), "Warning: Disabling Dapps server because fast RPC server was enabled.").expect("Error writing to stderr.")
}
@@ -526,6 +526,7 @@ impl Configuration {
force_sealing: self.args.flag_force_sealing,
reseal_on_external_tx: reseal.external,
reseal_on_own_tx: reseal.own,
reseal_on_uncle: self.args.flag_reseal_on_uncle,
tx_gas_limit: match self.args.flag_tx_gas_limit {
Some(ref d) => to_u256(d)?,
None => U256::max_value(),
@@ -555,6 +556,7 @@ impl Configuration {
fn ui_config(&self) -> UiConfiguration {
UiConfiguration {
enabled: self.ui_enabled(),
ntp_server: self.args.flag_ntp_server.clone(),
interface: self.ui_interface(),
port: self.args.flag_ports_shift + self.args.flag_ui_port,
hosts: self.ui_hosts(),
@@ -564,12 +566,18 @@ impl Configuration {
fn dapps_config(&self) -> DappsConfiguration {
DappsConfiguration {
enabled: self.dapps_enabled(),
ntp_server: self.args.flag_ntp_server.clone(),
dapps_path: PathBuf::from(self.directories().dapps),
extra_dapps: if self.args.cmd_dapp {
self.args.arg_path.iter().map(|path| PathBuf::from(path)).collect()
} else {
vec![]
},
extra_embed_on: if self.args.flag_ui_no_validation {
vec![("localhost".to_owned(), 3000)]
} else {
vec![]
},
}
}
@@ -662,7 +670,7 @@ impl Configuration {
let mut buffer = String::new();
let mut node_file = File::open(path).map_err(|e| format!("Error opening reserved nodes file: {}", e))?;
node_file.read_to_string(&mut buffer).map_err(|_| "Error reading reserved node file")?;
let lines = buffer.lines().map(|s| s.trim().to_owned()).filter(|s| !s.is_empty()).collect::<Vec<_>>();
let lines = buffer.lines().map(|s| s.trim().to_owned()).filter(|s| !s.is_empty() && !s.starts_with("#")).collect::<Vec<_>>();
if let Some(invalid) = lines.iter().find(|s| !is_valid_node_url(s)) {
return Err(format!("Invalid node address format given for a boot node: {}", invalid));
}
@@ -824,11 +832,12 @@ impl Configuration {
},
hosts: self.rpc_hosts(),
cors: self.rpc_cors(),
threads: match self.args.flag_jsonrpc_threads {
server_threads: match self.args.flag_jsonrpc_server_threads {
Some(threads) if threads > 0 => Some(threads),
None => None,
_ => return Err("--jsonrpc-threads number needs to be positive.".into()),
}
_ => return Err("--jsonrpc-server-threads number needs to be positive.".into()),
},
processing_threads: self.args.flag_jsonrpc_threads,
};
Ok(conf)
@@ -893,14 +902,20 @@ impl Configuration {
let local_path = default_local_path();
let base_path = self.args.flag_base_path.as_ref().or_else(|| self.args.flag_datadir.as_ref()).map_or_else(|| default_data_path(), |s| s.clone());
let data_path = replace_home("", &base_path);
let base_db_path = if self.args.flag_base_path.is_some() && self.args.flag_db_path.is_none() {
// If base_path is set and db_path is not we default to base path subdir instead of LOCAL.
let is_using_base_path = self.args.flag_base_path.is_some();
// If base_path is set and db_path is not we default to base path subdir instead of LOCAL.
let base_db_path = if is_using_base_path && self.args.flag_db_path.is_none() {
"$BASE/chains"
} else {
self.args.flag_db_path.as_ref().map_or(dir::CHAINS_PATH, |s| &s)
};
let cache_path = if is_using_base_path {
"$BASE/cache".into()
} else {
replace_home_and_local(&data_path, &local_path, &dir::CACHE_PATH)
};
let db_path = replace_home_for_db(&data_path, &local_path, &base_db_path);
let db_path = replace_home_and_local(&data_path, &local_path, &base_db_path);
let keys_path = replace_home(&data_path, &self.args.flag_keys_path);
let dapps_path = replace_home(&data_path, &self.args.flag_dapps_path);
let secretstore_path = replace_home(&data_path, &self.args.flag_secretstore_path);
@@ -924,6 +939,7 @@ impl Configuration {
Directories {
keys: keys_path,
base: data_path,
cache: cache_path,
db: db_path,
dapps: dapps_path,
signer: ui_path,
@@ -1256,6 +1272,7 @@ mod tests {
support_token_api: true
}, UiConfiguration {
enabled: true,
ntp_server: "pool.ntp.org:123".into(),
interface: "127.0.0.1".into(),
port: 8180,
hosts: Some(vec![]),
@@ -1496,6 +1513,7 @@ mod tests {
assert_eq!(conf0.directories().signer, "signer".to_owned());
assert_eq!(conf0.ui_config(), UiConfiguration {
enabled: true,
ntp_server: "pool.ntp.org:123".into(),
interface: "127.0.0.1".into(),
port: 8180,
hosts: Some(vec![]),
@@ -1504,14 +1522,17 @@ mod tests {
assert_eq!(conf1.directories().signer, "signer".to_owned());
assert_eq!(conf1.ui_config(), UiConfiguration {
enabled: true,
ntp_server: "pool.ntp.org:123".into(),
interface: "127.0.0.1".into(),
port: 8180,
hosts: Some(vec![]),
});
assert_eq!(conf1.dapps_config().extra_embed_on, vec![("localhost".to_owned(), 3000)]);
assert_eq!(conf1.ws_config().unwrap().hosts, None);
assert_eq!(conf2.directories().signer, "signer".to_owned());
assert_eq!(conf2.ui_config(), UiConfiguration {
enabled: true,
ntp_server: "pool.ntp.org:123".into(),
interface: "127.0.0.1".into(),
port: 3123,
hosts: Some(vec![]),
@@ -1520,6 +1541,7 @@ mod tests {
assert_eq!(conf3.directories().signer, "signer".to_owned());
assert_eq!(conf3.ui_config(), UiConfiguration {
enabled: true,
ntp_server: "pool.ntp.org:123".into(),
interface: "test".into(),
port: 8180,
hosts: Some(vec![]),
@@ -1554,6 +1576,19 @@ mod tests {
assert!(conf.init_reserved_nodes().is_ok());
}
#[test]
fn should_ignore_comments_in_reserved_peers() {
let temp = RandomTempPath::new();
create_dir(temp.as_str().to_owned()).unwrap();
let filename = temp.as_str().to_owned() + "/peers_comments";
File::create(filename.clone()).unwrap().write_all(b"# Sample comment\nenode://6f8a80d14311c39f35f516fa664deaaaa13e85b2f7493f37f6144d86991ec012937307647bd3b9a82abe2974e1407241d54947bbb39763a4cac9f77166ad92a0@172.0.0.1:30303\n").unwrap();
let args = vec!["parity", "--reserved-peers", &filename];
let conf = Configuration::parse(&args, None).unwrap();
let reserved_nodes = conf.init_reserved_nodes();
assert!(reserved_nodes.is_ok());
assert_eq!(reserved_nodes.unwrap().len(), 1);
}
#[test]
fn test_dev_chain() {
let args = vec!["parity", "--chain", "dev"];

View File

@@ -22,6 +22,7 @@ use ethcore::client::{Client, BlockChainClient, BlockId};
use ethcore::transaction::{Transaction, Action};
use ethsync::LightSync;
use futures::{future, IntoFuture, Future, BoxFuture};
use futures_cpupool::CpuPool;
use hash_fetch::fetch::Client as FetchClient;
use hash_fetch::urlhint::ContractClient;
use helpers::replace_home;
@@ -35,8 +36,10 @@ use util::{Bytes, Address};
#[derive(Debug, PartialEq, Clone)]
pub struct Configuration {
pub enabled: bool,
pub ntp_server: String,
pub dapps_path: PathBuf,
pub extra_dapps: Vec<PathBuf>,
pub extra_embed_on: Vec<(String, u16)>,
}
impl Default for Configuration {
@@ -44,8 +47,10 @@ impl Default for Configuration {
let data_dir = default_data_path();
Configuration {
enabled: true,
ntp_server: "pool.ntp.org:123".into(),
dapps_path: replace_home(&data_dir, "$BASE/dapps").into(),
extra_dapps: vec![],
extra_embed_on: vec![],
}
}
}
@@ -140,6 +145,7 @@ pub struct Dependencies {
pub sync_status: Arc<SyncStatus>,
pub contract_client: Arc<ContractClient>,
pub remote: parity_reactor::TokioRemote,
pub pool: CpuPool,
pub fetch: FetchClient,
pub signer: Arc<SignerService>,
pub ui_address: Option<(String, u16)>,
@@ -152,20 +158,23 @@ pub fn new(configuration: Configuration, deps: Dependencies) -> Result<Option<Mi
server::dapps_middleware(
deps,
&configuration.ntp_server,
configuration.dapps_path,
configuration.extra_dapps,
rpc::DAPPS_DOMAIN.into(),
rpc::DAPPS_DOMAIN,
configuration.extra_embed_on,
).map(Some)
}
pub fn new_ui(enabled: bool, deps: Dependencies) -> Result<Option<Middleware>, String> {
pub fn new_ui(enabled: bool, ntp_server: &str, deps: Dependencies) -> Result<Option<Middleware>, String> {
if !enabled {
return Ok(None);
}
server::ui_middleware(
deps,
rpc::DAPPS_DOMAIN.into(),
ntp_server,
rpc::DAPPS_DOMAIN,
).map(Some)
}
@@ -192,16 +201,19 @@ mod server {
pub fn dapps_middleware(
_deps: Dependencies,
_ntp_server: &str,
_dapps_path: PathBuf,
_extra_dapps: Vec<PathBuf>,
_dapps_domain: String,
_dapps_domain: &str,
_extra_embed_on: Vec<(String, u16)>,
) -> Result<Middleware, String> {
Err("Your Parity version has been compiled without WebApps support.".into())
}
pub fn ui_middleware(
_deps: Dependencies,
_dapps_domain: String,
_ntp_server: &str,
_dapps_domain: &str,
) -> Result<Middleware, String> {
Err("Your Parity version has been compiled without UI support.".into())
}
@@ -226,17 +238,22 @@ mod server {
pub fn dapps_middleware(
deps: Dependencies,
ntp_server: &str,
dapps_path: PathBuf,
extra_dapps: Vec<PathBuf>,
dapps_domain: String,
dapps_domain: &str,
extra_embed_on: Vec<(String, u16)>,
) -> Result<Middleware, String> {
let signer = deps.signer;
let parity_remote = parity_reactor::Remote::new(deps.remote.clone());
let web_proxy_tokens = Arc::new(move |token| signer.web_proxy_access_token_domain(&token));
Ok(parity_dapps::Middleware::dapps(
ntp_server,
deps.pool,
parity_remote,
deps.ui_address,
extra_embed_on,
dapps_path,
extra_dapps,
dapps_domain,
@@ -249,15 +266,18 @@ mod server {
pub fn ui_middleware(
deps: Dependencies,
dapps_domain: String,
ntp_server: &str,
dapps_domain: &str,
) -> Result<Middleware, String> {
let parity_remote = parity_reactor::Remote::new(deps.remote.clone());
Ok(parity_dapps::Middleware::ui(
ntp_server,
deps.pool,
parity_remote,
dapps_domain,
deps.contract_client,
deps.sync_status,
deps.fetch,
dapps_domain,
))
}

View File

@@ -18,7 +18,7 @@ use std::fs;
use std::path::{PathBuf, Path};
use util::{H64, H256};
use util::journaldb::Algorithm;
use helpers::{replace_home, replace_home_for_db};
use helpers::{replace_home, replace_home_and_local};
use app_dirs::{AppInfo, get_app_root, AppDataType};
#[cfg(target_os = "macos")] const AUTHOR: &'static str = "Parity";
@@ -34,6 +34,9 @@ use app_dirs::{AppInfo, get_app_root, AppDataType};
#[cfg(target_os = "windows")] pub const CHAINS_PATH: &'static str = "$LOCAL/chains";
#[cfg(not(target_os = "windows"))] pub const CHAINS_PATH: &'static str = "$BASE/chains";
#[cfg(target_os = "windows")] pub const CACHE_PATH: &'static str = "$LOCAL/cache";
#[cfg(not(target_os = "windows"))] pub const CACHE_PATH: &'static str = "$BASE/cache";
// this const is irrelevent cause we do have migrations now,
// but we still use it for backwards compatibility
const LEGACY_CLIENT_DB_VER_STR: &'static str = "5.3";
@@ -42,6 +45,7 @@ const LEGACY_CLIENT_DB_VER_STR: &'static str = "5.3";
pub struct Directories {
pub base: String,
pub db: String,
pub cache: String,
pub keys: String,
pub signer: String,
pub dapps: String,
@@ -54,7 +58,8 @@ impl Default for Directories {
let local_dir = default_local_path();
Directories {
base: replace_home(&data_dir, "$BASE"),
db: replace_home_for_db(&data_dir, &local_dir, CHAINS_PATH),
db: replace_home_and_local(&data_dir, &local_dir, CHAINS_PATH),
cache: replace_home_and_local(&data_dir, &local_dir, CACHE_PATH),
keys: replace_home(&data_dir, "$BASE/keys"),
signer: replace_home(&data_dir, "$BASE/signer"),
dapps: replace_home(&data_dir, "$BASE/dapps"),
@@ -67,6 +72,7 @@ impl Directories {
pub fn create_dirs(&self, dapps_enabled: bool, signer_enabled: bool, secretstore_enabled: bool) -> Result<(), String> {
fs::create_dir_all(&self.base).map_err(|e| e.to_string())?;
fs::create_dir_all(&self.db).map_err(|e| e.to_string())?;
fs::create_dir_all(&self.cache).map_err(|e| e.to_string())?;
fs::create_dir_all(&self.keys).map_err(|e| e.to_string())?;
if signer_enabled {
fs::create_dir_all(&self.signer).map_err(|e| e.to_string())?;
@@ -231,7 +237,7 @@ pub fn default_hypervisor_path() -> String {
#[cfg(test)]
mod tests {
use super::Directories;
use helpers::{replace_home, replace_home_for_db};
use helpers::{replace_home, replace_home_and_local};
#[test]
fn test_default_directories() {
@@ -239,10 +245,14 @@ mod tests {
let local_dir = super::default_local_path();
let expected = Directories {
base: replace_home(&data_dir, "$BASE"),
db: replace_home_for_db(&data_dir, &local_dir,
db: replace_home_and_local(&data_dir, &local_dir,
if cfg!(target_os = "windows") { "$LOCAL/chains" }
else { "$BASE/chains" }
),
cache: replace_home_and_local(&data_dir, &local_dir,
if cfg!(target_os = "windows") { "$LOCAL/cache" }
else { "$BASE/cache" }
),
keys: replace_home(&data_dir, "$BASE/keys"),
signer: replace_home(&data_dir, "$BASE/signer"),
dapps: replace_home(&data_dir, "$BASE/dapps"),

View File

@@ -140,7 +140,7 @@ pub fn replace_home(base: &str, arg: &str) -> String {
r.replace("/", &::std::path::MAIN_SEPARATOR.to_string())
}
pub fn replace_home_for_db(base: &str, local: &str, arg: &str) -> String {
pub fn replace_home_and_local(base: &str, local: &str, arg: &str) -> String {
let r = replace_home(base, arg);
r.replace("$LOCAL", local)
}

View File

@@ -21,32 +21,21 @@ use self::ansi_term::Style;
use std::sync::{Arc};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::time::{Instant, Duration};
use ethcore::client::*;
use ethcore::header::BlockNumber;
use ethcore::service::ClientIoMessage;
use ethcore::snapshot::{RestorationStatus, SnapshotService as SS};
use ethcore::snapshot::service::Service as SnapshotService;
use ethsync::{LightSyncProvider, LightSync, SyncProvider, ManageNetwork};
use io::{TimerToken, IoContext, IoHandler};
use isatty::{stdout_isatty};
use ethsync::{SyncProvider, ManageNetwork};
use util::{RwLock, Mutex, H256, Colour, Bytes};
use ethcore::client::*;
use ethcore::service::ClientIoMessage;
use ethcore::snapshot::service::Service as SnapshotService;
use ethcore::snapshot::{RestorationStatus, SnapshotService as SS};
use light::Cache as LightDataCache;
use light::client::LightChainClient;
use number_prefix::{binary_prefix, Standalone, Prefixed};
use parity_rpc::{is_major_importing};
use parity_rpc::informant::RpcStats;
pub struct Informant {
report: RwLock<Option<ClientReport>>,
last_tick: RwLock<Instant>,
with_color: bool,
client: Arc<Client>,
snapshot: Option<Arc<SnapshotService>>,
sync: Option<Arc<SyncProvider>>,
net: Option<Arc<ManageNetwork>>,
rpc_stats: Option<Arc<RpcStats>>,
last_import: Mutex<Instant>,
skipped: AtomicUsize,
skipped_txs: AtomicUsize,
in_shutdown: AtomicBool,
}
use util::{RwLock, Mutex, H256, Colour, Bytes};
/// Format byte counts to standard denominations.
pub fn format_bytes(b: usize) -> String {
@@ -68,29 +57,188 @@ impl MillisecondDuration for Duration {
}
}
impl Informant {
#[derive(Default)]
struct CacheSizes {
sizes: ::std::collections::BTreeMap<&'static str, usize>,
}
impl CacheSizes {
fn insert(&mut self, key: &'static str, bytes: usize) {
self.sizes.insert(key, bytes);
}
fn display<F>(&self, style: Style, paint: F) -> String
where F: Fn(Style, String) -> String
{
use std::fmt::Write;
let mut buf = String::new();
for (name, &size) in &self.sizes {
write!(buf, " {:>8} {}", paint(style, format_bytes(size)), name)
.expect("writing to string won't fail unless OOM; qed")
}
buf
}
}
pub struct SyncInfo {
last_imported_block_number: BlockNumber,
last_imported_old_block_number: Option<BlockNumber>,
num_peers: usize,
max_peers: u32,
}
pub struct Report {
importing: bool,
chain_info: BlockChainInfo,
client_report: ClientReport,
queue_info: BlockQueueInfo,
cache_sizes: CacheSizes,
sync_info: Option<SyncInfo>,
}
/// Something which can provide data to the informant.
pub trait InformantData: Send + Sync {
/// Whether it executes transactions
fn executes_transactions(&self) -> bool;
/// Whether it is currently importing (also included in `Report`)
fn is_major_importing(&self) -> bool;
/// Generate a report of blockchain status, memory usage, and sync info.
fn report(&self) -> Report;
}
/// Informant data for a full node.
pub struct FullNodeInformantData {
pub client: Arc<Client>,
pub sync: Option<Arc<SyncProvider>>,
pub net: Option<Arc<ManageNetwork>>,
}
impl InformantData for FullNodeInformantData {
fn executes_transactions(&self) -> bool { true }
fn is_major_importing(&self) -> bool {
let state = self.sync.as_ref().map(|sync| sync.status().state);
is_major_importing(state, self.client.queue_info())
}
fn report(&self) -> Report {
let (client_report, queue_info, blockchain_cache_info) =
(self.client.report(), self.client.queue_info(), self.client.blockchain_cache_info());
let chain_info = self.client.chain_info();
let mut cache_sizes = CacheSizes::default();
cache_sizes.insert("db", client_report.state_db_mem);
cache_sizes.insert("queue", queue_info.mem_used);
cache_sizes.insert("chain", blockchain_cache_info.total());
let (importing, sync_info) = match (self.sync.as_ref(), self.net.as_ref()) {
(Some(sync), Some(net)) => {
let status = sync.status();
let net_config = net.network_config();
cache_sizes.insert("sync", status.mem_used);
let importing = is_major_importing(Some(status.state), queue_info.clone());
(importing, Some(SyncInfo {
last_imported_block_number: status.last_imported_block_number.unwrap_or(chain_info.best_block_number),
last_imported_old_block_number: status.last_imported_old_block_number,
num_peers: status.num_peers,
max_peers: status.current_max_peers(net_config.min_peers, net_config.max_peers),
}))
}
_ => (is_major_importing(None, queue_info.clone()), None),
};
Report {
importing,
chain_info,
client_report,
queue_info,
cache_sizes,
sync_info,
}
}
}
/// Informant data for a light node -- note that the network is required.
pub struct LightNodeInformantData {
pub client: Arc<LightChainClient>,
pub sync: Arc<LightSync>,
pub cache: Arc<Mutex<LightDataCache>>,
}
impl InformantData for LightNodeInformantData {
fn executes_transactions(&self) -> bool { false }
fn is_major_importing(&self) -> bool {
self.sync.is_major_importing()
}
fn report(&self) -> Report {
let (client_report, queue_info, chain_info) =
(self.client.report(), self.client.queue_info(), self.client.chain_info());
let mut cache_sizes = CacheSizes::default();
cache_sizes.insert("queue", queue_info.mem_used);
cache_sizes.insert("cache", self.cache.lock().mem_used());
let peer_numbers = self.sync.peer_numbers();
let sync_info = Some(SyncInfo {
last_imported_block_number: chain_info.best_block_number,
last_imported_old_block_number: None,
num_peers: peer_numbers.connected,
max_peers: peer_numbers.max as u32,
});
Report {
importing: self.sync.is_major_importing(),
chain_info,
client_report,
queue_info,
cache_sizes,
sync_info,
}
}
}
pub struct Informant<T> {
last_tick: RwLock<Instant>,
with_color: bool,
target: T,
snapshot: Option<Arc<SnapshotService>>,
rpc_stats: Option<Arc<RpcStats>>,
last_import: Mutex<Instant>,
skipped: AtomicUsize,
skipped_txs: AtomicUsize,
in_shutdown: AtomicBool,
last_report: Mutex<ClientReport>,
}
impl<T: InformantData> Informant<T> {
/// Make a new instance potentially `with_color` output.
pub fn new(
client: Arc<Client>,
sync: Option<Arc<SyncProvider>>,
net: Option<Arc<ManageNetwork>>,
target: T,
snapshot: Option<Arc<SnapshotService>>,
rpc_stats: Option<Arc<RpcStats>>,
with_color: bool,
) -> Self {
Informant {
report: RwLock::new(None),
last_tick: RwLock::new(Instant::now()),
with_color: with_color,
client: client,
target: target,
snapshot: snapshot,
sync: sync,
net: net,
rpc_stats: rpc_stats,
last_import: Mutex::new(Instant::now()),
skipped: AtomicUsize::new(0),
skipped_txs: AtomicUsize::new(0),
in_shutdown: AtomicBool::new(false),
last_report: Mutex::new(Default::default()),
}
}
@@ -106,14 +254,24 @@ impl Informant {
return;
}
let chain_info = self.client.chain_info();
let queue_info = self.client.queue_info();
let cache_info = self.client.blockchain_cache_info();
let network_config = self.net.as_ref().map(|n| n.network_config());
let sync_status = self.sync.as_ref().map(|s| s.status());
let Report {
importing,
chain_info,
client_report,
queue_info,
cache_sizes,
sync_info,
} = self.target.report();
let client_report = {
let mut last_report = self.last_report.lock();
let diffed = client_report.clone() - &*last_report;
*last_report = client_report.clone();
diffed
};
let rpc_stats = self.rpc_stats.as_ref();
let importing = is_major_importing(sync_status.map(|s| s.state), self.client.queue_info());
let (snapshot_sync, snapshot_current, snapshot_total) = self.snapshot.as_ref().map_or((false, 0, 0), |s|
match s.status() {
RestorationStatus::Ongoing { state_chunks, block_chunks, state_chunks_done, block_chunks_done } =>
@@ -128,9 +286,6 @@ impl Informant {
*self.last_tick.write() = Instant::now();
let mut write_report = self.report.write();
let report = self.client.report();
let paint = |c: Style, t: String| match self.with_color && stdout_isatty() {
true => format!("{}", c.paint(t)),
false => t,
@@ -142,13 +297,16 @@ impl Informant {
false => format!("Syncing {} {} {} {}+{} Qed",
paint(White.bold(), format!("{:>8}", format!("#{}", chain_info.best_block_number))),
paint(White.bold(), format!("{}", chain_info.best_block_hash)),
{
let last_report = match *write_report { Some(ref last_report) => last_report.clone(), _ => ClientReport::default() };
if self.target.executes_transactions() {
format!("{} blk/s {} tx/s {} Mgas/s",
paint(Yellow.bold(), format!("{:4}", ((report.blocks_imported - last_report.blocks_imported) * 1000) as u64 / elapsed.as_milliseconds())),
paint(Yellow.bold(), format!("{:4}", ((report.transactions_applied - last_report.transactions_applied) * 1000) as u64 / elapsed.as_milliseconds())),
paint(Yellow.bold(), format!("{:3}", ((report.gas_processed - last_report.gas_processed) / From::from(elapsed.as_milliseconds() * 1000)).low_u64()))
)
paint(Yellow.bold(), format!("{:4}", (client_report.blocks_imported * 1000) as u64 / elapsed.as_milliseconds())),
paint(Yellow.bold(), format!("{:4}", (client_report.transactions_applied * 1000) as u64 / elapsed.as_milliseconds())),
paint(Yellow.bold(), format!("{:3}", (client_report.gas_processed / From::from(elapsed.as_milliseconds() * 1000)).low_u64()))
)
} else {
format!("{} hdr/s",
paint(Yellow.bold(), format!("{:4}", (client_report.blocks_imported * 1000) as u64 / elapsed.as_milliseconds()))
)
},
paint(Green.bold(), format!("{:5}", queue_info.unverified_queue_size)),
paint(Green.bold(), format!("{:5}", queue_info.verified_queue_size))
@@ -157,29 +315,21 @@ impl Informant {
},
false => String::new(),
},
match (&sync_status, &network_config) {
(&Some(ref sync_info), &Some(ref net_config)) => format!("{}{}/{} peers",
match sync_info.as_ref() {
Some(ref sync_info) => format!("{}{}/{} peers",
match importing {
true => format!("{} ", paint(Green.bold(), format!("{:>8}", format!("#{}", sync_info.last_imported_block_number.unwrap_or(chain_info.best_block_number))))),
true => format!("{} ", paint(Green.bold(), format!("{:>8}", format!("#{}", sync_info.last_imported_block_number)))),
false => match sync_info.last_imported_old_block_number {
Some(number) => format!("{} ", paint(Yellow.bold(), format!("{:>8}", format!("#{}", number)))),
None => String::new(),
}
},
paint(Cyan.bold(), format!("{:2}", sync_info.num_peers)),
paint(Cyan.bold(), format!("{:2}", sync_info.current_max_peers(net_config.min_peers, net_config.max_peers))),
paint(Cyan.bold(), format!("{:2}", sync_info.max_peers)),
),
_ => String::new(),
},
format!("{} db {} chain {} queue{}",
paint(Blue.bold(), format!("{:>8}", format_bytes(report.state_db_mem))),
paint(Blue.bold(), format!("{:>8}", format_bytes(cache_info.total()))),
paint(Blue.bold(), format!("{:>8}", format_bytes(queue_info.mem_used))),
match sync_status {
Some(ref sync_info) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", format_bytes(sync_info.mem_used)))),
_ => String::new(),
}
),
cache_sizes.display(Blue.bold(), &paint),
match rpc_stats {
Some(ref rpc_stats) => format!(
"RPC: {} conn, {} req/s, {} µs",
@@ -190,25 +340,24 @@ impl Informant {
_ => String::new(),
},
);
*write_report = Some(report);
}
}
impl ChainNotify for Informant {
impl ChainNotify for Informant<FullNodeInformantData> {
fn new_blocks(&self, imported: Vec<H256>, _invalid: Vec<H256>, _enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, _proposed: Vec<Bytes>, duration: u64) {
let mut last_import = self.last_import.lock();
let sync_state = self.sync.as_ref().map(|s| s.status().state);
let importing = is_major_importing(sync_state, self.client.queue_info());
let client = &self.target.client;
let importing = self.target.is_major_importing();
let ripe = Instant::now() > *last_import + Duration::from_secs(1) && !importing;
let txs_imported = imported.iter()
.take(imported.len().saturating_sub(if ripe { 1 } else { 0 }))
.filter_map(|h| self.client.block(BlockId::Hash(*h)))
.filter_map(|h| client.block(BlockId::Hash(*h)))
.map(|b| b.transactions_count())
.sum();
if ripe {
if let Some(block) = imported.last().and_then(|h| self.client.block(BlockId::Hash(*h))) {
if let Some(block) = imported.last().and_then(|h| client.block(BlockId::Hash(*h))) {
let header_view = block.header_view();
let size = block.rlp().as_raw().len();
let (skipped, skipped_txs) = (self.skipped.load(AtomicOrdering::Relaxed) + imported.len() - 1, self.skipped_txs.load(AtomicOrdering::Relaxed) + txs_imported);
@@ -241,7 +390,7 @@ impl ChainNotify for Informant {
const INFO_TIMER: TimerToken = 0;
impl IoHandler<ClientIoMessage> for Informant {
impl<T: InformantData> IoHandler<ClientIoMessage> for Informant<T> {
fn initialize(&self, io: &IoContext<ClientIoMessage>) {
io.register_timer(INFO_TIMER, 5000).expect("Error registering timer");
}

View File

@@ -29,6 +29,7 @@ extern crate docopt;
extern crate env_logger;
extern crate fdlimit;
extern crate futures;
extern crate futures_cpupool;
extern crate isatty;
extern crate jsonrpc_core;
extern crate num_cpus;

View File

@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::{str, fs, fmt};
use std::{str, fs, fmt, path};
use std::time::Duration;
use util::{Address, U256, version_data};
use util::journaldb::Algorithm;
@@ -79,19 +79,20 @@ impl fmt::Display for SpecType {
}
impl SpecType {
pub fn spec(&self) -> Result<Spec, String> {
pub fn spec<T: AsRef<path::Path>>(&self, cache_dir: T) -> Result<Spec, String> {
let cache_dir = cache_dir.as_ref();
match *self {
SpecType::Foundation => Ok(ethereum::new_foundation()),
SpecType::Morden => Ok(ethereum::new_morden()),
SpecType::Ropsten => Ok(ethereum::new_ropsten()),
SpecType::Olympic => Ok(ethereum::new_olympic()),
SpecType::Classic => Ok(ethereum::new_classic()),
SpecType::Expanse => Ok(ethereum::new_expanse()),
SpecType::Kovan => Ok(ethereum::new_kovan()),
SpecType::Foundation => Ok(ethereum::new_foundation(cache_dir)),
SpecType::Morden => Ok(ethereum::new_morden(cache_dir)),
SpecType::Ropsten => Ok(ethereum::new_ropsten(cache_dir)),
SpecType::Olympic => Ok(ethereum::new_olympic(cache_dir)),
SpecType::Classic => Ok(ethereum::new_classic(cache_dir)),
SpecType::Expanse => Ok(ethereum::new_expanse(cache_dir)),
SpecType::Kovan => Ok(ethereum::new_kovan(cache_dir)),
SpecType::Dev => Ok(Spec::new_instant()),
SpecType::Custom(ref filename) => {
let file = fs::File::open(filename).map_err(|_| "Could not load specification file.")?;
Spec::load(file)
let file = fs::File::open(filename).map_err(|e| format!("Could not load specification file at {}: {}", filename, e))?;
Spec::load(cache_dir, file)
}
}
}

View File

@@ -30,6 +30,7 @@ use rpc_apis::{self, ApiSet};
pub use parity_rpc::{IpcServer, HttpServer, RequestMiddleware};
pub use parity_rpc::ws::Server as WsServer;
pub use parity_rpc::informant::CpuPool;
pub const DAPPS_DOMAIN: &'static str = "web3.site";
@@ -42,7 +43,8 @@ pub struct HttpConfiguration {
pub apis: ApiSet,
pub cors: Option<Vec<String>>,
pub hosts: Option<Vec<String>>,
pub threads: Option<usize>,
pub server_threads: Option<usize>,
pub processing_threads: usize,
}
impl HttpConfiguration {
@@ -63,7 +65,8 @@ impl Default for HttpConfiguration {
apis: ApiSet::UnsafeContext,
cors: None,
hosts: Some(Vec::new()),
threads: None,
server_threads: None,
processing_threads: 0,
}
}
}
@@ -71,6 +74,7 @@ impl Default for HttpConfiguration {
#[derive(Debug, PartialEq, Clone)]
pub struct UiConfiguration {
pub enabled: bool,
pub ntp_server: String,
pub interface: String,
pub port: u16,
pub hosts: Option<Vec<String>>,
@@ -94,7 +98,8 @@ impl From<UiConfiguration> for HttpConfiguration {
apis: rpc_apis::ApiSet::SafeContext,
cors: None,
hosts: conf.hosts,
threads: None,
server_threads: None,
processing_threads: 0,
}
}
}
@@ -103,6 +108,7 @@ impl Default for UiConfiguration {
fn default() -> Self {
UiConfiguration {
enabled: true && cfg!(feature = "ui-enabled"),
ntp_server: "pool.ntp.org:123".into(),
port: 8180,
interface: "127.0.0.1".into(),
hosts: Some(vec![]),
@@ -176,6 +182,7 @@ pub struct Dependencies<D: rpc_apis::Dependencies> {
pub apis: Arc<D>,
pub remote: TokioRemote,
pub stats: Arc<RpcStats>,
pub pool: Option<CpuPool>,
}
pub fn new_ws<D: rpc_apis::Dependencies>(
@@ -192,13 +199,14 @@ pub fn new_ws<D: rpc_apis::Dependencies>(
let addr = url.parse().map_err(|_| format!("Invalid WebSockets listen host/port given: {}", url))?;
let full_handler = setup_apis(rpc_apis::ApiSet::SafeContext, deps);
let pool = deps.pool.clone();
let full_handler = setup_apis(rpc_apis::ApiSet::SafeContext, deps, pool.clone());
let handler = {
let mut handler = MetaIoHandler::with_middleware((
rpc::WsDispatcher::new(full_handler),
Middleware::new(deps.stats.clone(), deps.apis.activity_notifier())
Middleware::new(deps.stats.clone(), deps.apis.activity_notifier(), pool)
));
let apis = conf.apis.list_apis().into_iter().collect::<Vec<_>>();
let apis = conf.apis.list_apis();
deps.apis.extend_with_set(&mut handler, &apis);
handler
@@ -252,7 +260,8 @@ pub fn new_http<D: rpc_apis::Dependencies>(
let http_address = (conf.interface, conf.port);
let url = format!("{}:{}", http_address.0, http_address.1);
let addr = url.parse().map_err(|_| format!("Invalid {} listen host/port given: {}", id, url))?;
let handler = setup_apis(conf.apis, deps);
let pool = deps.pool.clone();
let handler = setup_apis(conf.apis, deps, pool);
let remote = deps.remote.clone();
let cors_domains = into_domains(conf.cors);
@@ -265,7 +274,7 @@ pub fn new_http<D: rpc_apis::Dependencies>(
handler,
remote,
rpc::RpcExtractor,
match (conf.threads, middleware) {
match (conf.server_threads, middleware) {
(Some(threads), None) => rpc::HttpSettings::Threads(threads),
(None, middleware) => rpc::HttpSettings::Dapps(middleware),
(Some(_), Some(_)) => {
@@ -291,7 +300,8 @@ pub fn new_ipc<D: rpc_apis::Dependencies>(
return Ok(None);
}
let handler = setup_apis(conf.apis, dependencies);
let pool = dependencies.pool.clone();
let handler = setup_apis(conf.apis, dependencies, pool);
let remote = dependencies.remote.clone();
match rpc::start_ipc(&conf.socket_addr, handler, remote, rpc::RpcExtractor) {
Ok(server) => Ok(Some(server)),
@@ -318,13 +328,13 @@ fn with_domain(items: Option<Vec<String>>, domain: &str, addresses: &[Option<(St
})
}
fn setup_apis<D>(apis: ApiSet, deps: &Dependencies<D>) -> MetaIoHandler<Metadata, Middleware<D::Notifier>>
fn setup_apis<D>(apis: ApiSet, deps: &Dependencies<D>, pool: Option<CpuPool>) -> MetaIoHandler<Metadata, Middleware<D::Notifier>>
where D: rpc_apis::Dependencies
{
let mut handler = MetaIoHandler::with_middleware(
Middleware::new(deps.stats.clone(), deps.apis.activity_notifier())
Middleware::new(deps.stats.clone(), deps.apis.activity_notifier(), pool)
);
let apis = apis.list_apis().into_iter().collect::<Vec<_>>();
let apis = apis.list_apis();
deps.apis.extend_with_set(&mut handler, &apis);
handler

View File

@@ -106,6 +106,8 @@ pub enum ApiSet {
All,
// Local "unsafe" context and accounts access
IpcContext,
// APIs for Parity Generic Pub-Sub
PubSub,
// Fixed list of APis
List(HashSet<Api>),
}
@@ -153,7 +155,7 @@ impl FromStr for ApiSet {
}
}
fn to_modules(apis: &[Api]) -> BTreeMap<String, String> {
fn to_modules(apis: &HashSet<Api>) -> BTreeMap<String, String> {
let mut modules = BTreeMap::new();
for api in apis {
let (name, version) = match *api {
@@ -187,7 +189,7 @@ pub trait Dependencies {
fn extend_with_set<S>(
&self,
handler: &mut MetaIoHandler<Metadata, S>,
apis: &[Api],
apis: &HashSet<Api>,
) where S: core::Middleware<Metadata>;
}
@@ -217,7 +219,7 @@ impl FullDependencies {
fn extend_api<S>(
&self,
handler: &mut MetaIoHandler<Metadata, S>,
apis: &[Api],
apis: &HashSet<Api>,
for_generic_pubsub: bool,
) where S: core::Middleware<Metadata> {
use parity_rpc::v1::*;
@@ -305,7 +307,8 @@ impl FullDependencies {
Api::ParityPubSub => {
if !for_generic_pubsub {
let mut rpc = MetaIoHandler::default();
self.extend_api(&mut rpc, apis, true);
let apis = ApiSet::List(apis.clone()).retain(ApiSet::PubSub).list_apis();
self.extend_api(&mut rpc, &apis, true);
handler.extend_with(PubSubClient::new(rpc, self.remote.clone()).to_delegate());
}
},
@@ -349,7 +352,7 @@ impl Dependencies for FullDependencies {
fn extend_with_set<S>(
&self,
handler: &mut MetaIoHandler<Metadata, S>,
apis: &[Api],
apis: &HashSet<Api>,
) where S: core::Middleware<Metadata> {
self.extend_api(handler, apis, false)
}
@@ -386,7 +389,7 @@ impl LightDependencies {
fn extend_api<T: core::Middleware<Metadata>>(
&self,
handler: &mut MetaIoHandler<Metadata, T>,
apis: &[Api],
apis: &HashSet<Api>,
for_generic_pubsub: bool,
) {
use parity_rpc::v1::*;
@@ -486,7 +489,8 @@ impl LightDependencies {
Api::ParityPubSub => {
if !for_generic_pubsub {
let mut rpc = MetaIoHandler::default();
self.extend_api(&mut rpc, apis, true);
let apis = ApiSet::List(apis.clone()).retain(ApiSet::PubSub).list_apis();
self.extend_api(&mut rpc, &apis, true);
handler.extend_with(PubSubClient::new(rpc, self.remote.clone()).to_delegate());
}
},
@@ -525,7 +529,7 @@ impl Dependencies for LightDependencies {
fn extend_with_set<S>(
&self,
handler: &mut MetaIoHandler<Metadata, S>,
apis: &[Api],
apis: &HashSet<Api>,
) where S: core::Middleware<Metadata> {
self.extend_api(handler, apis, false)
}
@@ -538,9 +542,9 @@ impl ApiSet {
}
pub fn list_apis(&self) -> HashSet<Api> {
let mut public_list = vec![
let mut public_list = [
Api::Web3, Api::Net, Api::Eth, Api::EthPubSub, Api::Parity, Api::Rpc, Api::SecretStore,
].into_iter().collect();
].into_iter().cloned().collect();
match *self {
ApiSet::List(ref apis) => apis.clone(),
ApiSet::PublicContext => public_list,
@@ -572,6 +576,13 @@ impl ApiSet {
public_list.insert(Api::Personal);
public_list
},
ApiSet::PubSub => [
Api::Eth,
Api::Parity,
Api::ParityAccounts,
Api::ParitySet,
Api::Traces,
].into_iter().cloned().collect()
}
}
}

View File

@@ -16,26 +16,26 @@
use std::sync::Arc;
use std::net::{TcpListener};
use ctrlc::CtrlC;
use fdlimit::raise_fd_limit;
use parity_rpc::{NetworkSettings, informant, is_major_importing};
use ethsync::NetworkConfiguration;
use util::{Colour, version, Mutex, Condvar};
use ethcore_logger::{Config as LogConfig, RotatingLogger};
use ethcore::miner::{StratumOptions, Stratum};
use ethcore::client::{Client, Mode, DatabaseCompactionProfile, VMType, BlockChainClient};
use ethcore::service::ClientService;
use ethcore::account_provider::{AccountProvider, AccountProviderSettings};
use ethcore::client::{Client, Mode, DatabaseCompactionProfile, VMType, BlockChainClient};
use ethcore::ethstore::ethkey;
use ethcore::miner::{Miner, MinerService, ExternalMiner, MinerOptions};
use ethcore::miner::{StratumOptions, Stratum};
use ethcore::service::ClientService;
use ethcore::snapshot;
use ethcore::verification::queue::VerifierSettings;
use ethcore::ethstore::ethkey;
use light::Cache as LightDataCache;
use ethsync::SyncConfig;
use informant::Informant;
use updater::{UpdatePolicy, Updater};
use parity_reactor::EventLoop;
use ethsync::{self, SyncConfig};
use fdlimit::raise_fd_limit;
use hash_fetch::fetch::{Fetch, Client as FetchClient};
use informant::{Informant, LightNodeInformantData, FullNodeInformantData};
use light::Cache as LightDataCache;
use parity_reactor::EventLoop;
use parity_rpc::{NetworkSettings, informant, is_major_importing};
use updater::{UpdatePolicy, Updater};
use util::{Colour, version, Mutex, Condvar};
use params::{
SpecType, Pruning, AccountsConfig, GasPricerConfig, MinerExtras, Switch,
@@ -83,7 +83,7 @@ pub struct RunCmd {
pub ws_conf: rpc::WsConfiguration,
pub http_conf: rpc::HttpConfiguration,
pub ipc_conf: rpc::IpcConfiguration,
pub net_conf: NetworkConfiguration,
pub net_conf: ethsync::NetworkConfiguration,
pub network_id: Option<u64>,
pub warp_sync: bool,
pub public_node: bool,
@@ -168,7 +168,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
use util::RwLock;
// load spec
let spec = cmd.spec.spec()?;
let spec = cmd.spec.spec(&cmd.dirs.cache)?;
// load genesis hash
let genesis_hash = spec.genesis_header().hash();
@@ -209,6 +209,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
db_cache_size: Some(cmd.cache_config.blockchain() as usize * 1024 * 1024),
db_compaction: compaction,
db_wal: cmd.wal,
verify_full: true,
};
config.queue.max_mem_use = cmd.cache_config.queue() as usize * 1024 * 1024;
@@ -235,7 +236,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
network_config: net_conf.into_basic().map_err(|e| format!("Failed to produce network config: {}", e))?,
client: Arc::new(provider),
network_id: cmd.network_id.unwrap_or(spec.network_id()),
subprotocol_name: ::ethsync::LIGHT_PROTOCOL,
subprotocol_name: ethsync::LIGHT_PROTOCOL,
handlers: vec![on_demand.clone()],
};
let light_sync = LightSync::new(sync_params).map_err(|e| format!("Error starting network: {}", e))?;
@@ -275,9 +276,18 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
on_demand: on_demand.clone(),
});
let sync = light_sync.clone();
struct LightSyncStatus(Arc<LightSync>);
impl dapps::SyncStatus for LightSyncStatus {
fn is_major_importing(&self) -> bool { self.0.is_major_importing() }
fn peers(&self) -> (usize, usize) {
let peers = ethsync::LightSyncProvider::peer_numbers(&*self.0);
(peers.connected, peers.max)
}
}
dapps::Dependencies {
sync_status: Arc::new(move || sync.is_major_importing()),
sync_status: Arc::new(LightSyncStatus(light_sync.clone())),
pool: fetch.pool(),
contract_client: contract_client,
remote: event_loop.raw_remote(),
fetch: fetch.clone(),
@@ -287,7 +297,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
};
let dapps_middleware = dapps::new(cmd.dapps_conf.clone(), dapps_deps.clone())?;
let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, dapps_deps)?;
let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_server, dapps_deps)?;
// start RPCs
let dapps_service = dapps::service(&dapps_middleware);
@@ -300,7 +310,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
logger: logger,
settings: Arc::new(cmd.net_settings),
on_demand: on_demand,
cache: cache,
cache: cache.clone(),
transaction_queue: txq,
dapps_service: dapps_service,
dapps_address: cmd.dapps_conf.address(cmd.http_conf.address()),
@@ -314,6 +324,11 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
apis: deps_for_rpc_apis.clone(),
remote: event_loop.raw_remote(),
stats: rpc_stats.clone(),
pool: if cmd.http_conf.processing_threads > 0 {
Some(rpc::CpuPool::new(cmd.http_conf.processing_threads))
} else {
None
},
};
// start rpc servers
@@ -322,16 +337,25 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
let _ipc_server = rpc::new_ipc(cmd.ipc_conf, &dependencies)?;
let _ui_server = rpc::new_http("Parity Wallet (UI)", "ui", cmd.ui_conf.clone().into(), &dependencies, ui_middleware)?;
// minimal informant thread. Just prints block number every 5 seconds.
// TODO: integrate with informant.rs
let informant_client = service.client().clone();
::std::thread::spawn(move || loop {
info!("#{}", informant_client.best_block_header().number());
::std::thread::sleep(::std::time::Duration::from_secs(5));
});
// the informant
let informant = Arc::new(Informant::new(
LightNodeInformantData {
client: service.client().clone(),
sync: light_sync.clone(),
cache: cache,
},
None,
Some(rpc_stats),
cmd.logger_config.color,
));
// wait for ctrl-c.
Ok(wait_for_exit(None, None, can_restart))
service.register_handler(informant.clone()).map_err(|_| "Unable to register informant handler".to_owned())?;
// wait for ctrl-c and then shut down the informant.
let res = wait_for_exit(None, None, can_restart);
informant.shutdown();
Ok(res)
}
pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> Result<(bool, Option<String>), String> {
@@ -352,7 +376,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
}
// load spec
let spec = cmd.spec.spec()?;
let spec = cmd.spec.spec(&cmd.dirs.cache)?;
// load genesis hash
let genesis_hash = spec.genesis_header().hash();
@@ -570,7 +594,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
let (sync_provider, manage_network, chain_notify) = modules::sync(
&mut hypervisor,
sync_config,
net_conf.into(),
net_conf.clone().into(),
client.clone(),
snapshot_service.clone(),
client.clone(),
@@ -614,8 +638,20 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
let (sync, client) = (sync_provider.clone(), client.clone());
let contract_client = Arc::new(::dapps::FullRegistrar { client: client.clone() });
struct SyncStatus(Arc<ethsync::SyncProvider>, Arc<Client>, ethsync::NetworkConfiguration);
impl dapps::SyncStatus for SyncStatus {
fn is_major_importing(&self) -> bool {
is_major_importing(Some(self.0.status().state), self.1.queue_info())
}
fn peers(&self) -> (usize, usize) {
let status = self.0.status();
(status.num_peers, status.current_max_peers(self.2.min_peers, self.2.max_peers) as usize)
}
}
dapps::Dependencies {
sync_status: Arc::new(move || is_major_importing(Some(sync.status().state), client.queue_info())),
sync_status: Arc::new(SyncStatus(sync, client, net_conf)),
pool: fetch.pool(),
contract_client: contract_client,
remote: event_loop.raw_remote(),
fetch: fetch.clone(),
@@ -624,7 +660,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
}
};
let dapps_middleware = dapps::new(cmd.dapps_conf.clone(), dapps_deps.clone())?;
let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, dapps_deps)?;
let ui_middleware = dapps::new_ui(cmd.ui_conf.enabled, &cmd.ui_conf.ntp_server, dapps_deps)?;
let dapps_service = dapps::service(&dapps_middleware);
let deps_for_rpc_apis = Arc::new(rpc_apis::FullDependencies {
@@ -652,6 +688,12 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
apis: deps_for_rpc_apis.clone(),
remote: event_loop.raw_remote(),
stats: rpc_stats.clone(),
pool: if cmd.http_conf.processing_threads > 0 {
Some(rpc::CpuPool::new(cmd.http_conf.processing_threads))
} else {
None
},
};
// start rpc servers
@@ -672,9 +714,11 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
// the informant
let informant = Arc::new(Informant::new(
service.client(),
Some(sync_provider.clone()),
Some(manage_network.clone()),
FullNodeInformantData {
client: service.client(),
sync: Some(sync_provider.clone()),
net: Some(manage_network.clone()),
},
Some(snapshot_service.clone()),
Some(rpc_stats.clone()),
cmd.logger_config.color,

View File

@@ -133,7 +133,7 @@ impl SnapshotCommand {
// shared portion of snapshot commands: start the client service
fn start_service(self) -> Result<ClientService, String> {
// load spec file
let spec = self.spec.spec()?;
let spec = self.spec.spec(&self.dirs.cache)?;
// load genesis hash
let genesis_hash = spec.genesis_header().hash();