Ensure jsonrpc threading settings are sane (#11267)
* Ensure jsonrpc threading settings are sane Starting with `jsonrpc` v14, the "server threads" setting is more important than before and the current default of 1 means the https server is effectively single-threaded. This PR proposes a new default of 4 (and ensures that crazy settings like e.g. `0` are bumped to at least `1`). Also included: some docs, tests and cosmetics. * Update parity/rpc.rs Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * Update parity/rpc.rs Co-Authored-By: Tomasz Drwięga <tomusdrw@users.noreply.github.com> * Remove (i.e. deprecate) `--jsonrpc-threads` command line option * Call numbers NUM * Don't show a default for --jsonrpc-threads (deprecated) * Show deprecation warning when using `--jsonrpc-threads` or `processing_threads` * Update parity/deprecated.rs Co-Authored-By: Niklas Adolfsson <niklasadolfsson1@gmail.com> * Fix test * Fix tests for real
This commit is contained in:
		
							parent
							
								
									93f700d961
								
							
						
					
					
						commit
						82c3265858
					
				@ -506,18 +506,18 @@ usage! {
 | 
			
		||||
			"--jsonrpc-hosts=[HOSTS]",
 | 
			
		||||
			"List of allowed Host header values. This option will validate the Host header sent by the browser, it is additional security against some attack vectors. Special options: \"all\", \"none\",.",
 | 
			
		||||
 | 
			
		||||
			ARG arg_jsonrpc_threads: (usize) = 4usize, or |c: &Config| c.rpc.as_ref()?.processing_threads,
 | 
			
		||||
			"--jsonrpc-threads=[THREADS]",
 | 
			
		||||
			"Turn on additional processing threads for JSON-RPC servers (all transports). Setting this to a non-zero value allows parallel execution of cpu-heavy queries.",
 | 
			
		||||
			ARG arg_jsonrpc_threads: (Option<usize>) = None, or |_| None,
 | 
			
		||||
			"--jsonrpc-threads=[NUM]",
 | 
			
		||||
			"DEPRECATED, DOES NOTHING",
 | 
			
		||||
 | 
			
		||||
			ARG arg_jsonrpc_server_threads: (Option<usize>) = Some(4), or |c: &Config| c.rpc.as_ref()?.server_threads,
 | 
			
		||||
			"--jsonrpc-server-threads=[NUM]",
 | 
			
		||||
			"Enables multiple threads handling incoming connections for HTTP JSON-RPC server.",
 | 
			
		||||
 | 
			
		||||
			ARG arg_jsonrpc_cors: (String) = "none", or |c: &Config| c.rpc.as_ref()?.cors.as_ref().map(|vec| vec.join(",")),
 | 
			
		||||
			"--jsonrpc-cors=[URL]",
 | 
			
		||||
			"Specify CORS header for HTTP JSON-RPC API responses. Special options: \"all\", \"none\".",
 | 
			
		||||
 | 
			
		||||
			ARG arg_jsonrpc_server_threads: (Option<usize>) = None, or |c: &Config| c.rpc.as_ref()?.server_threads,
 | 
			
		||||
			"--jsonrpc-server-threads=[NUM]",
 | 
			
		||||
			"Enables multiple threads handling incoming connections for HTTP JSON-RPC server.",
 | 
			
		||||
 | 
			
		||||
			ARG arg_jsonrpc_max_payload: (Option<usize>) = None, or |c: &Config| c.rpc.as_ref()?.max_payload,
 | 
			
		||||
			"--jsonrpc-max-payload=[MB]",
 | 
			
		||||
			"Specify maximum size for HTTP JSON-RPC requests in megabytes.",
 | 
			
		||||
@ -1262,7 +1262,6 @@ struct Rpc {
 | 
			
		||||
	apis: Option<Vec<String>>,
 | 
			
		||||
	hosts: Option<Vec<String>>,
 | 
			
		||||
	server_threads: Option<usize>,
 | 
			
		||||
	processing_threads: Option<usize>,
 | 
			
		||||
	max_payload: Option<usize>,
 | 
			
		||||
	keep_alive: Option<bool>,
 | 
			
		||||
	experimental_rpcs: Option<bool>,
 | 
			
		||||
@ -1816,8 +1815,8 @@ mod tests {
 | 
			
		||||
			arg_jsonrpc_cors: "null".into(),
 | 
			
		||||
			arg_jsonrpc_apis: "web3,eth,net,parity,traces,rpc,secretstore".into(),
 | 
			
		||||
			arg_jsonrpc_hosts: "none".into(),
 | 
			
		||||
			arg_jsonrpc_server_threads: None,
 | 
			
		||||
			arg_jsonrpc_threads: 4,
 | 
			
		||||
			arg_jsonrpc_server_threads: Some(4),
 | 
			
		||||
			arg_jsonrpc_threads: None, // DEPRECATED, does nothing
 | 
			
		||||
			arg_jsonrpc_max_payload: None,
 | 
			
		||||
			arg_poll_lifetime: 60u32,
 | 
			
		||||
			flag_jsonrpc_allow_missing_blocks: false,
 | 
			
		||||
@ -2095,8 +2094,7 @@ mod tests {
 | 
			
		||||
				cors: None,
 | 
			
		||||
				apis: None,
 | 
			
		||||
				hosts: None,
 | 
			
		||||
				server_threads: None,
 | 
			
		||||
				processing_threads: None,
 | 
			
		||||
				server_threads: Some(13),
 | 
			
		||||
				max_payload: None,
 | 
			
		||||
				keep_alive: None,
 | 
			
		||||
				experimental_rpcs: None,
 | 
			
		||||
 | 
			
		||||
@ -31,6 +31,7 @@ origins = ["none"]
 | 
			
		||||
[rpc]
 | 
			
		||||
disable = true
 | 
			
		||||
port = 8180
 | 
			
		||||
server_threads = 13
 | 
			
		||||
 | 
			
		||||
[ipc]
 | 
			
		||||
apis = ["rpc", "eth"]
 | 
			
		||||
 | 
			
		||||
@ -367,49 +367,49 @@ impl Configuration {
 | 
			
		||||
			let (private_provider_conf, private_enc_conf, private_tx_enabled) = self.private_provider_config()?;
 | 
			
		||||
 | 
			
		||||
			let run_cmd = RunCmd {
 | 
			
		||||
				cache_config: cache_config,
 | 
			
		||||
				dirs: dirs,
 | 
			
		||||
				spec: spec,
 | 
			
		||||
				pruning: pruning,
 | 
			
		||||
				pruning_history: pruning_history,
 | 
			
		||||
				cache_config,
 | 
			
		||||
				dirs,
 | 
			
		||||
				spec,
 | 
			
		||||
				pruning,
 | 
			
		||||
				pruning_history,
 | 
			
		||||
				pruning_memory: self.args.arg_pruning_memory,
 | 
			
		||||
				daemon: daemon,
 | 
			
		||||
				daemon,
 | 
			
		||||
				logger_config: logger_config.clone(),
 | 
			
		||||
				miner_options: self.miner_options()?,
 | 
			
		||||
				gas_price_percentile: self.args.arg_gas_price_percentile,
 | 
			
		||||
				poll_lifetime: self.args.arg_poll_lifetime,
 | 
			
		||||
				ws_conf: ws_conf,
 | 
			
		||||
				snapshot_conf: snapshot_conf,
 | 
			
		||||
				http_conf: http_conf,
 | 
			
		||||
				ipc_conf: ipc_conf,
 | 
			
		||||
				net_conf: net_conf,
 | 
			
		||||
				network_id: network_id,
 | 
			
		||||
				ws_conf,
 | 
			
		||||
				snapshot_conf,
 | 
			
		||||
				http_conf,
 | 
			
		||||
				ipc_conf,
 | 
			
		||||
				net_conf,
 | 
			
		||||
				network_id,
 | 
			
		||||
				acc_conf: self.accounts_config()?,
 | 
			
		||||
				gas_pricer_conf: self.gas_pricer_config()?,
 | 
			
		||||
				miner_extras: self.miner_extras()?,
 | 
			
		||||
				stratum: self.stratum_options()?,
 | 
			
		||||
				update_policy: update_policy,
 | 
			
		||||
				update_policy,
 | 
			
		||||
				allow_missing_blocks: self.args.flag_jsonrpc_allow_missing_blocks,
 | 
			
		||||
				mode: mode,
 | 
			
		||||
				tracing: tracing,
 | 
			
		||||
				fat_db: fat_db,
 | 
			
		||||
				compaction: compaction,
 | 
			
		||||
				vm_type: vm_type,
 | 
			
		||||
				warp_sync: warp_sync,
 | 
			
		||||
				mode,
 | 
			
		||||
				tracing,
 | 
			
		||||
				fat_db,
 | 
			
		||||
				compaction,
 | 
			
		||||
				vm_type,
 | 
			
		||||
				warp_sync,
 | 
			
		||||
				warp_barrier: self.args.arg_warp_barrier,
 | 
			
		||||
				geth_compatibility: geth_compatibility,
 | 
			
		||||
				geth_compatibility,
 | 
			
		||||
				experimental_rpcs,
 | 
			
		||||
				net_settings: self.network_settings()?,
 | 
			
		||||
				ipfs_conf: ipfs_conf,
 | 
			
		||||
				secretstore_conf: secretstore_conf,
 | 
			
		||||
				private_provider_conf: private_provider_conf,
 | 
			
		||||
				ipfs_conf,
 | 
			
		||||
				secretstore_conf,
 | 
			
		||||
				private_provider_conf,
 | 
			
		||||
				private_encryptor_conf: private_enc_conf,
 | 
			
		||||
				private_tx_enabled,
 | 
			
		||||
				name: self.args.arg_identity,
 | 
			
		||||
				custom_bootnodes: self.args.arg_bootnodes.is_some(),
 | 
			
		||||
				check_seal: !self.args.flag_no_seal_check,
 | 
			
		||||
				download_old_blocks: !self.args.flag_no_ancient_blocks,
 | 
			
		||||
				verifier_settings: verifier_settings,
 | 
			
		||||
				verifier_settings,
 | 
			
		||||
				serve_light: !self.args.flag_no_serve_light,
 | 
			
		||||
				light: self.args.flag_light,
 | 
			
		||||
				no_persistent_txqueue: self.args.flag_no_persistent_txqueue,
 | 
			
		||||
@ -426,7 +426,7 @@ impl Configuration {
 | 
			
		||||
 | 
			
		||||
		Ok(Execute {
 | 
			
		||||
			logger: logger_config,
 | 
			
		||||
			cmd: cmd,
 | 
			
		||||
			cmd,
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -885,24 +885,20 @@ impl Configuration {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	fn http_config(&self) -> Result<HttpConfiguration, String> {
 | 
			
		||||
		let conf = HttpConfiguration {
 | 
			
		||||
			enabled: self.rpc_enabled(),
 | 
			
		||||
			interface: self.rpc_interface(),
 | 
			
		||||
			port: self.args.arg_ports_shift + self.args.arg_rpcport.unwrap_or(self.args.arg_jsonrpc_port),
 | 
			
		||||
			apis: self.rpc_apis().parse()?,
 | 
			
		||||
			hosts: self.rpc_hosts(),
 | 
			
		||||
			cors: self.rpc_cors(),
 | 
			
		||||
			server_threads: match self.args.arg_jsonrpc_server_threads {
 | 
			
		||||
				Some(threads) if threads > 0 => threads,
 | 
			
		||||
				_ => 1,
 | 
			
		||||
			},
 | 
			
		||||
			processing_threads: self.args.arg_jsonrpc_threads,
 | 
			
		||||
			max_payload: match self.args.arg_jsonrpc_max_payload {
 | 
			
		||||
				Some(max) if max > 0 => max as usize,
 | 
			
		||||
				_ => 5usize,
 | 
			
		||||
			},
 | 
			
		||||
			keep_alive: !self.args.flag_jsonrpc_no_keep_alive,
 | 
			
		||||
		};
 | 
			
		||||
		let mut conf = HttpConfiguration::default();
 | 
			
		||||
		conf.enabled = self.rpc_enabled();
 | 
			
		||||
		conf.interface = self.rpc_interface();
 | 
			
		||||
		conf.port = self.args.arg_ports_shift + self.args.arg_rpcport.unwrap_or(self.args.arg_jsonrpc_port);
 | 
			
		||||
		conf.apis = self.rpc_apis().parse()?;
 | 
			
		||||
		conf.hosts = self.rpc_hosts();
 | 
			
		||||
		conf.cors = self.rpc_cors();
 | 
			
		||||
		if let Some(threads) = self.args.arg_jsonrpc_server_threads {
 | 
			
		||||
			conf.server_threads = std::cmp::max(1, threads);
 | 
			
		||||
		}
 | 
			
		||||
		if let Some(max_payload) = self.args.arg_jsonrpc_max_payload {
 | 
			
		||||
			conf.max_payload = std::cmp::max(1, max_payload);
 | 
			
		||||
		}
 | 
			
		||||
		conf.keep_alive = !self.args.flag_jsonrpc_no_keep_alive;
 | 
			
		||||
 | 
			
		||||
		Ok(conf)
 | 
			
		||||
	}
 | 
			
		||||
@ -1626,6 +1622,28 @@ mod tests {
 | 
			
		||||
		assert_eq!(conf3.rpc_hosts(), Some(vec!["parity.io".into(), "something.io".into()]));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn ensures_sane_http_settings() {
 | 
			
		||||
		// given incorrect settings
 | 
			
		||||
		let conf = parse(&["parity",
 | 
			
		||||
			"--jsonrpc-server-threads=0",
 | 
			
		||||
			"--jsonrpc-max-payload=0",
 | 
			
		||||
		]);
 | 
			
		||||
 | 
			
		||||
		// then things are adjusted to Just Work.
 | 
			
		||||
		let http_conf = conf.http_config().unwrap();
 | 
			
		||||
		assert_eq!(http_conf.server_threads, 1);
 | 
			
		||||
		assert_eq!(http_conf.max_payload, 1);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn jsonrpc_threading_defaults() {
 | 
			
		||||
		let conf = parse(&["parity"]);
 | 
			
		||||
		let http_conf = conf.http_config().unwrap();
 | 
			
		||||
		assert_eq!(http_conf.server_threads, 4);
 | 
			
		||||
		assert_eq!(http_conf.max_payload, 5);
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	#[test]
 | 
			
		||||
	fn should_parse_ipfs_hosts() {
 | 
			
		||||
		// given
 | 
			
		||||
 | 
			
		||||
@ -239,6 +239,10 @@ pub fn find_deprecated(args: &Args) -> Vec<Deprecated> {
 | 
			
		||||
		result.push(Deprecated::Removed("--whisper-pool-size"));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if args.arg_jsonrpc_threads.is_some() {
 | 
			
		||||
		result.push(Deprecated::Removed("--jsonrpc--threads (aka processing_threads)"));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	result
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -193,9 +193,12 @@ pub enum ExecutionAction {
 | 
			
		||||
fn execute<Cr, Rr>(
 | 
			
		||||
	command: Execute,
 | 
			
		||||
	logger: Arc<RotatingLogger>,
 | 
			
		||||
	on_client_rq: Cr, on_updater_rq: Rr) -> Result<ExecutionAction, String>
 | 
			
		||||
	where Cr: Fn(String) + 'static + Send,
 | 
			
		||||
		  Rr: Fn() + 'static + Send
 | 
			
		||||
	on_client_rq: Cr,
 | 
			
		||||
	on_updater_rq: Rr
 | 
			
		||||
) -> Result<ExecutionAction, String>
 | 
			
		||||
	where
 | 
			
		||||
		Cr: Fn(String) + 'static + Send,
 | 
			
		||||
		Rr: Fn() + 'static + Send
 | 
			
		||||
{
 | 
			
		||||
	#[cfg(feature = "deadlock_detection")]
 | 
			
		||||
	run_deadlock_detection_thread();
 | 
			
		||||
 | 
			
		||||
@ -35,15 +35,24 @@ pub const DAPPS_DOMAIN: &'static str = "web3.site";
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Clone, PartialEq)]
 | 
			
		||||
pub struct HttpConfiguration {
 | 
			
		||||
	/// Is RPC over HTTP enabled (default is true)?
 | 
			
		||||
	pub enabled: bool,
 | 
			
		||||
	/// The IP of the network interface used (default is 127.0.0.1).
 | 
			
		||||
	pub interface: String,
 | 
			
		||||
	/// The network port (default is 8545).
 | 
			
		||||
	pub port: u16,
 | 
			
		||||
	/// The categories of RPC calls enabled.
 | 
			
		||||
	pub apis: ApiSet,
 | 
			
		||||
	/// CORS headers
 | 
			
		||||
	pub cors: Option<Vec<String>>,
 | 
			
		||||
	/// Specify a list of valid hosts we accept requests from.
 | 
			
		||||
	pub hosts: Option<Vec<String>>,
 | 
			
		||||
	/// Number of HTTP server threads to use to handle incoming requests (default is 4).
 | 
			
		||||
	pub server_threads: usize,
 | 
			
		||||
	pub processing_threads: usize,
 | 
			
		||||
	/// Sets the maximum size of a request body in megabytes (default is 5 MiB).
 | 
			
		||||
	pub max_payload: usize,
 | 
			
		||||
	/// Use keepalive messages on the underlying socket: SO_KEEPALIVE as well as the TCP_KEEPALIVE
 | 
			
		||||
	/// or TCP_KEEPIDLE options depending on your platform (default is true).
 | 
			
		||||
	pub keep_alive: bool,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -56,8 +65,7 @@ impl Default for HttpConfiguration {
 | 
			
		||||
			apis: ApiSet::UnsafeContext,
 | 
			
		||||
			cors: Some(vec![]),
 | 
			
		||||
			hosts: Some(vec![]),
 | 
			
		||||
			server_threads: 1,
 | 
			
		||||
			processing_threads: 4,
 | 
			
		||||
			server_threads: 4,
 | 
			
		||||
			max_payload: 5,
 | 
			
		||||
			keep_alive: true,
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
@ -284,7 +284,8 @@ fn execute_light_impl<Cr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq
 | 
			
		||||
	let light_sync = Arc::new(light_sync);
 | 
			
		||||
	*sync_handle.write() = Arc::downgrade(&light_sync);
 | 
			
		||||
 | 
			
		||||
	// spin up event loop
 | 
			
		||||
	// Spin up the Tokio event loop with core_threads = number of logical cores on the machine.
 | 
			
		||||
	// This runtime is shared among many subsystems: sync, rpc processing, tx broadcasting, price fetcher etc
 | 
			
		||||
	let runtime = Runtime::with_default_thread_count();
 | 
			
		||||
 | 
			
		||||
	// start the network.
 | 
			
		||||
@ -361,9 +362,14 @@ fn execute_light_impl<Cr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq: Cr,
 | 
			
		||||
						on_updater_rq: Rr) -> Result<RunningClient, String>
 | 
			
		||||
	where Cr: Fn(String) + 'static + Send,
 | 
			
		||||
fn execute_impl<Cr, Rr>(
 | 
			
		||||
	cmd: RunCmd,
 | 
			
		||||
	logger: Arc<RotatingLogger>,
 | 
			
		||||
	on_client_rq: Cr,
 | 
			
		||||
	on_updater_rq: Rr
 | 
			
		||||
) -> Result<RunningClient, String>
 | 
			
		||||
	where
 | 
			
		||||
		Cr: Fn(String) + 'static + Send,
 | 
			
		||||
		Rr: Fn() + 'static + Send
 | 
			
		||||
{
 | 
			
		||||
	// load spec
 | 
			
		||||
@ -477,7 +483,8 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
 | 
			
		||||
	// prepare account provider
 | 
			
		||||
	let account_provider = Arc::new(account_utils::prepare_account_provider(&cmd.spec, &cmd.dirs, &spec.data_dir, cmd.acc_conf, &passwords)?);
 | 
			
		||||
 | 
			
		||||
	// spin up event loop
 | 
			
		||||
	// Spin up the Tokio event loop with core_threads = number of logical cores on the machine.
 | 
			
		||||
	// This runtime is shared among many subsystems: sync, rpc processing, tx broadcasting, price fetcher etc
 | 
			
		||||
	let runtime = Runtime::with_default_thread_count();
 | 
			
		||||
 | 
			
		||||
	// fetch service
 | 
			
		||||
@ -910,9 +917,14 @@ impl RunningClient {
 | 
			
		||||
/// `on_updater_rq` is the action to perform when the updater has a new binary to execute.
 | 
			
		||||
///
 | 
			
		||||
/// On error, returns what to print on stderr.
 | 
			
		||||
pub fn execute<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>,
 | 
			
		||||
						on_client_rq: Cr, on_updater_rq: Rr) -> Result<RunningClient, String>
 | 
			
		||||
	where Cr: Fn(String) + 'static + Send,
 | 
			
		||||
pub fn execute<Cr, Rr>(
 | 
			
		||||
	cmd: RunCmd,
 | 
			
		||||
	logger: Arc<RotatingLogger>,
 | 
			
		||||
	on_client_rq: Cr,
 | 
			
		||||
	on_updater_rq: Rr
 | 
			
		||||
) -> Result<RunningClient, String>
 | 
			
		||||
	where
 | 
			
		||||
		Cr: Fn(String) + 'static + Send,
 | 
			
		||||
		Rr: Fn() + 'static + Send
 | 
			
		||||
{
 | 
			
		||||
	if cmd.light {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user