refactor(fetch) : light use only one DNS
thread (#9647)
* refactor(fetch) : light use only one `DNS` thread * grumbles(fetch) : pass number of threads directly
This commit is contained in:
parent
a8f6f5b974
commit
47848769ff
@ -77,7 +77,13 @@ const SNAPSHOT_HISTORY: u64 = 100;
|
|||||||
const GAS_CORPUS_EXPIRATION_MINUTES: u64 = 60 * 6;
|
const GAS_CORPUS_EXPIRATION_MINUTES: u64 = 60 * 6;
|
||||||
|
|
||||||
// Pops along with error messages when a password is missing or invalid.
|
// Pops along with error messages when a password is missing or invalid.
|
||||||
const VERIFY_PASSWORD_HINT: &'static str = "Make sure valid password is present in files passed using `--password` or in the configuration file.";
|
const VERIFY_PASSWORD_HINT: &str = "Make sure valid password is present in files passed using `--password` or in the configuration file.";
|
||||||
|
|
||||||
|
// Full client number of DNS threads
|
||||||
|
const FETCH_FULL_NUM_DNS_THREADS: usize = 4;
|
||||||
|
|
||||||
|
// Light client number of DNS threads
|
||||||
|
const FETCH_LIGHT_NUM_DNS_THREADS: usize = 1;
|
||||||
|
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
pub struct RunCmd {
|
pub struct RunCmd {
|
||||||
@ -283,7 +289,7 @@ fn execute_light_impl(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<Runnin
|
|||||||
let cpu_pool = CpuPool::new(4);
|
let cpu_pool = CpuPool::new(4);
|
||||||
|
|
||||||
// fetch service
|
// fetch service
|
||||||
let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;
|
let fetch = fetch::Client::new(FETCH_LIGHT_NUM_DNS_THREADS).map_err(|e| format!("Error starting fetch client: {:?}", e))?;
|
||||||
let passwords = passwords_from_files(&cmd.acc_conf.password_files)?;
|
let passwords = passwords_from_files(&cmd.acc_conf.password_files)?;
|
||||||
|
|
||||||
// prepare account provider
|
// prepare account provider
|
||||||
@ -477,7 +483,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
|
|||||||
let event_loop = EventLoop::spawn();
|
let event_loop = EventLoop::spawn();
|
||||||
|
|
||||||
// fetch service
|
// fetch service
|
||||||
let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;
|
let fetch = fetch::Client::new(FETCH_FULL_NUM_DNS_THREADS).map_err(|e| format!("Error starting fetch client: {:?}", e))?;
|
||||||
|
|
||||||
let txpool_size = cmd.miner_options.pool_limits.max_count;
|
let txpool_size = cmd.miner_options.pool_limits.max_count;
|
||||||
// create miner
|
// create miner
|
||||||
|
@ -170,11 +170,11 @@ impl Drop for Client {
|
|||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
/// Create a new fetch client.
|
/// Create a new fetch client.
|
||||||
pub fn new() -> Result<Self, Error> {
|
pub fn new(num_dns_threads: usize) -> Result<Self, Error> {
|
||||||
let (tx_start, rx_start) = std::sync::mpsc::sync_channel(1);
|
let (tx_start, rx_start) = std::sync::mpsc::sync_channel(1);
|
||||||
let (tx_proto, rx_proto) = mpsc::channel(64);
|
let (tx_proto, rx_proto) = mpsc::channel(64);
|
||||||
|
|
||||||
Client::background_thread(tx_start, rx_proto)?;
|
Client::background_thread(tx_start, rx_proto, num_dns_threads)?;
|
||||||
|
|
||||||
match rx_start.recv_timeout(Duration::from_secs(10)) {
|
match rx_start.recv_timeout(Duration::from_secs(10)) {
|
||||||
Err(RecvTimeoutError::Timeout) => {
|
Err(RecvTimeoutError::Timeout) => {
|
||||||
@ -199,7 +199,7 @@ impl Client {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn background_thread(tx_start: TxStartup, rx_proto: mpsc::Receiver<ChanItem>) -> io::Result<thread::JoinHandle<()>> {
|
fn background_thread(tx_start: TxStartup, rx_proto: mpsc::Receiver<ChanItem>, num_dns_threads: usize) -> io::Result<thread::JoinHandle<()>> {
|
||||||
thread::Builder::new().name("fetch".into()).spawn(move || {
|
thread::Builder::new().name("fetch".into()).spawn(move || {
|
||||||
let mut core = match reactor::Core::new() {
|
let mut core = match reactor::Core::new() {
|
||||||
Ok(c) => c,
|
Ok(c) => c,
|
||||||
@ -208,7 +208,7 @@ impl Client {
|
|||||||
|
|
||||||
let handle = core.handle();
|
let handle = core.handle();
|
||||||
let hyper = hyper::Client::configure()
|
let hyper = hyper::Client::configure()
|
||||||
.connector(hyper_rustls::HttpsConnector::new(4, &core.handle()))
|
.connector(hyper_rustls::HttpsConnector::new(num_dns_threads, &core.handle()))
|
||||||
.build(&core.handle());
|
.build(&core.handle());
|
||||||
|
|
||||||
let future = rx_proto.take_while(|item| Ok(item.is_some()))
|
let future = rx_proto.take_while(|item| Ok(item.is_some()))
|
||||||
@ -640,7 +640,18 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn it_should_fetch() {
|
fn it_should_fetch() {
|
||||||
let server = TestServer::run();
|
let server = TestServer::run();
|
||||||
let client = Client::new().unwrap();
|
let client = Client::new(4).unwrap();
|
||||||
|
let future = client.get(&format!("http://{}?123", server.addr()), Default::default());
|
||||||
|
let resp = future.wait().unwrap();
|
||||||
|
assert!(resp.is_success());
|
||||||
|
let body = resp.concat2().wait().unwrap();
|
||||||
|
assert_eq!(&body[..], b"123")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn it_should_fetch_in_light_mode() {
|
||||||
|
let server = TestServer::run();
|
||||||
|
let client = Client::new(1).unwrap();
|
||||||
let future = client.get(&format!("http://{}?123", server.addr()), Default::default());
|
let future = client.get(&format!("http://{}?123", server.addr()), Default::default());
|
||||||
let resp = future.wait().unwrap();
|
let resp = future.wait().unwrap();
|
||||||
assert!(resp.is_success());
|
assert!(resp.is_success());
|
||||||
@ -651,7 +662,7 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn it_should_timeout() {
|
fn it_should_timeout() {
|
||||||
let server = TestServer::run();
|
let server = TestServer::run();
|
||||||
let client = Client::new().unwrap();
|
let client = Client::new(4).unwrap();
|
||||||
let abort = Abort::default().with_max_duration(Duration::from_secs(1));
|
let abort = Abort::default().with_max_duration(Duration::from_secs(1));
|
||||||
match client.get(&format!("http://{}/delay?3", server.addr()), abort).wait() {
|
match client.get(&format!("http://{}/delay?3", server.addr()), abort).wait() {
|
||||||
Err(Error::Timeout) => {}
|
Err(Error::Timeout) => {}
|
||||||
@ -662,7 +673,7 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn it_should_follow_redirects() {
|
fn it_should_follow_redirects() {
|
||||||
let server = TestServer::run();
|
let server = TestServer::run();
|
||||||
let client = Client::new().unwrap();
|
let client = Client::new(4).unwrap();
|
||||||
let abort = Abort::default();
|
let abort = Abort::default();
|
||||||
let future = client.get(&format!("http://{}/redirect?http://{}/", server.addr(), server.addr()), abort);
|
let future = client.get(&format!("http://{}/redirect?http://{}/", server.addr(), server.addr()), abort);
|
||||||
assert!(future.wait().unwrap().is_success())
|
assert!(future.wait().unwrap().is_success())
|
||||||
@ -671,7 +682,7 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn it_should_follow_relative_redirects() {
|
fn it_should_follow_relative_redirects() {
|
||||||
let server = TestServer::run();
|
let server = TestServer::run();
|
||||||
let client = Client::new().unwrap();
|
let client = Client::new(4).unwrap();
|
||||||
let abort = Abort::default().with_max_redirects(4);
|
let abort = Abort::default().with_max_redirects(4);
|
||||||
let future = client.get(&format!("http://{}/redirect?/", server.addr()), abort);
|
let future = client.get(&format!("http://{}/redirect?/", server.addr()), abort);
|
||||||
assert!(future.wait().unwrap().is_success())
|
assert!(future.wait().unwrap().is_success())
|
||||||
@ -680,7 +691,7 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn it_should_not_follow_too_many_redirects() {
|
fn it_should_not_follow_too_many_redirects() {
|
||||||
let server = TestServer::run();
|
let server = TestServer::run();
|
||||||
let client = Client::new().unwrap();
|
let client = Client::new(4).unwrap();
|
||||||
let abort = Abort::default().with_max_redirects(3);
|
let abort = Abort::default().with_max_redirects(3);
|
||||||
match client.get(&format!("http://{}/loop", server.addr()), abort).wait() {
|
match client.get(&format!("http://{}/loop", server.addr()), abort).wait() {
|
||||||
Err(Error::TooManyRedirects) => {}
|
Err(Error::TooManyRedirects) => {}
|
||||||
@ -691,7 +702,7 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn it_should_read_data() {
|
fn it_should_read_data() {
|
||||||
let server = TestServer::run();
|
let server = TestServer::run();
|
||||||
let client = Client::new().unwrap();
|
let client = Client::new(4).unwrap();
|
||||||
let abort = Abort::default();
|
let abort = Abort::default();
|
||||||
let future = client.get(&format!("http://{}?abcdefghijklmnopqrstuvwxyz", server.addr()), abort);
|
let future = client.get(&format!("http://{}?abcdefghijklmnopqrstuvwxyz", server.addr()), abort);
|
||||||
let resp = future.wait().unwrap();
|
let resp = future.wait().unwrap();
|
||||||
@ -702,7 +713,7 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn it_should_not_read_too_much_data() {
|
fn it_should_not_read_too_much_data() {
|
||||||
let server = TestServer::run();
|
let server = TestServer::run();
|
||||||
let client = Client::new().unwrap();
|
let client = Client::new(4).unwrap();
|
||||||
let abort = Abort::default().with_max_size(3);
|
let abort = Abort::default().with_max_size(3);
|
||||||
let resp = client.get(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
|
let resp = client.get(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
|
||||||
assert!(resp.is_success());
|
assert!(resp.is_success());
|
||||||
@ -715,7 +726,7 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn it_should_not_read_too_much_data_sync() {
|
fn it_should_not_read_too_much_data_sync() {
|
||||||
let server = TestServer::run();
|
let server = TestServer::run();
|
||||||
let client = Client::new().unwrap();
|
let client = Client::new(4).unwrap();
|
||||||
let abort = Abort::default().with_max_size(3);
|
let abort = Abort::default().with_max_size(3);
|
||||||
let resp = client.get(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
|
let resp = client.get(&format!("http://{}/?1234", server.addr()), abort).wait().unwrap();
|
||||||
assert!(resp.is_success());
|
assert!(resp.is_success());
|
||||||
|
Loading…
Reference in New Issue
Block a user