Splitting informant,io_handler and webapps

This commit is contained in:
Tomasz Drwięga 2016-04-21 13:57:27 +02:00
parent 09b2d7b3a6
commit 3e4adcb3b6
5 changed files with 335 additions and 230 deletions

89
parity/informant.rs Normal file
View File

@ -0,0 +1,89 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::RwLock;
use std::ops::{Deref, DerefMut};
use ethsync::{EthSync, SyncProvider};
use ethcore::client::*;
use number_prefix::{binary_prefix, Standalone, Prefixed};
pub struct Informant {
chain_info: RwLock<Option<BlockChainInfo>>,
cache_info: RwLock<Option<BlockChainCacheSize>>,
report: RwLock<Option<ClientReport>>,
}
impl Default for Informant {
fn default() -> Self {
Informant {
chain_info: RwLock::new(None),
cache_info: RwLock::new(None),
report: RwLock::new(None),
}
}
}
impl Informant {
fn format_bytes(b: usize) -> String {
match binary_prefix(b as f64) {
Standalone(bytes) => format!("{} bytes", bytes),
Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix),
}
}
pub fn tick(&self, client: &Client, sync: &EthSync) {
// 5 seconds betwen calls. TODO: calculate this properly.
let dur = 5usize;
let chain_info = client.chain_info();
let queue_info = client.queue_info();
let cache_info = client.blockchain_cache_info();
let sync_info = sync.status();
let mut write_report = self.report.write().unwrap();
let report = client.report();
if let (_, _, &Some(ref last_report)) = (
self.chain_info.read().unwrap().deref(),
self.cache_info.read().unwrap().deref(),
write_report.deref()
) {
println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} db, {} chain, {} queue, {} sync ]",
chain_info.best_block_number,
chain_info.best_block_hash,
(report.blocks_imported - last_report.blocks_imported) / dur,
(report.transactions_applied - last_report.transactions_applied) / dur,
(report.gas_processed - last_report.gas_processed) / From::from(dur),
sync_info.num_active_peers,
sync_info.num_peers,
sync_info.last_imported_block_number.unwrap_or(chain_info.best_block_number),
queue_info.unverified_queue_size,
queue_info.verified_queue_size,
Informant::format_bytes(report.state_db_mem),
Informant::format_bytes(cache_info.total()),
Informant::format_bytes(queue_info.mem_used),
Informant::format_bytes(sync_info.mem_used),
);
}
*self.chain_info.write().unwrap().deref_mut() = Some(chain_info);
*self.cache_info.write().unwrap().deref_mut() = Some(cache_info);
*write_report.deref_mut() = Some(report);
}
}

54
parity/io_handler.rs Normal file
View File

@ -0,0 +1,54 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use ethcore::client::Client;
use ethcore::service::NetSyncMessage;
use ethsync::EthSync;
use util::keys::store::AccountService;
use util::{TimerToken, IoHandler, IoContext};
use informant::Informant;
const INFO_TIMER: TimerToken = 0;
const ACCOUNT_TICK_TIMER: TimerToken = 10;
const ACCOUNT_TICK_MS: u64 = 60000;
pub struct ClientIoHandler {
pub client: Arc<Client>,
pub sync: Arc<EthSync>,
pub accounts: Arc<AccountService>,
pub info: Informant,
}
impl IoHandler<NetSyncMessage> for ClientIoHandler {
fn initialize(&self, io: &IoContext<NetSyncMessage>) {
io.register_timer(INFO_TIMER, 5000).expect("Error registering timer");
io.register_timer(ACCOUNT_TICK_TIMER, ACCOUNT_TICK_MS).expect("Error registering account timer");
}
fn timeout(&self, _io: &IoContext<NetSyncMessage>, timer: TimerToken) {
match timer {
INFO_TIMER => { self.info.tick(&self.client, &self.sync); }
ACCOUNT_TICK_TIMER => { self.accounts.tick(); },
_ => {}
}
}
}

View File

@ -42,7 +42,6 @@ extern crate ethcore_ipc as ipc;
extern crate ethcore_ipc_nano as nanoipc;
extern crate serde;
extern crate bincode;
// for price_info.rs
#[macro_use] extern crate hyper;
@ -50,7 +49,7 @@ extern crate bincode;
extern crate ethcore_rpc;
#[cfg(feature = "webapp")]
extern crate ethcore_webapp as webapp;
extern crate ethcore_webapp;
use std::io::{BufRead, BufReader};
use std::fs::File;
@ -63,16 +62,12 @@ use util::panics::{MayPanic, ForwardPanic, PanicHandler};
use util::keys::store::*;
use ethcore::spec::*;
use ethcore::client::*;
use ethcore::service::{ClientService, NetSyncMessage};
use ethcore::ethereum;
use ethsync::{EthSync, SyncConfig, SyncProvider};
use ethcore::service::ClientService;
use ethsync::{EthSync, SyncConfig};
use ethminer::{Miner, MinerService};
use docopt::Docopt;
use daemonize::Daemonize;
use number_prefix::{binary_prefix, Standalone, Prefixed};
#[cfg(feature = "webapp")]
use webapp::Server as WebappServer;
#[macro_use]
mod die;
@ -81,9 +76,14 @@ mod upgrade;
mod hypervisor;
mod setup_log;
mod rpc;
mod webapp;
mod informant;
mod io_handler;
use die::*;
use rpc::RpcServer;
use webapp::WebappServer;
use io_handler::ClientIoHandler;
const USAGE: &'static str = r#"
Parity. Ethereum Client.
@ -278,58 +278,6 @@ struct Args {
}
#[cfg(feature = "webapp")]
fn setup_webapp_server(
client: Arc<Client>,
sync: Arc<EthSync>,
secret_store: Arc<AccountService>,
miner: Arc<Miner>,
url: &SocketAddr,
auth: Option<(String, String)>,
logger: Arc<RotatingLogger>,
) -> WebappServer {
use ethcore_rpc::v1::*;
let server = webapp::ServerBuilder::new();
server.add_delegate(Web3Client::new().to_delegate());
server.add_delegate(NetClient::new(&sync).to_delegate());
server.add_delegate(EthClient::new(&client, &sync, &secret_store, &miner).to_delegate());
server.add_delegate(EthFilterClient::new(&client, &miner).to_delegate());
server.add_delegate(PersonalClient::new(&secret_store).to_delegate());
server.add_delegate(EthcoreClient::new(&miner, logger).to_delegate());
let start_result = match auth {
None => {
server.start_unsecure_http(url)
},
Some((username, password)) => {
server.start_basic_auth_http(url, &username, &password)
},
};
match start_result {
Err(webapp::ServerError::IoError(err)) => die_with_io_error(err),
Err(e) => die!("{:?}", e),
Ok(handle) => handle,
}
}
#[cfg(not(feature = "webapp"))]
struct WebappServer;
#[cfg(not(feature = "webapp"))]
fn setup_webapp_server(
_client: Arc<Client>,
_sync: Arc<EthSync>,
_secret_store: Arc<AccountService>,
_miner: Arc<Miner>,
_url: &SocketAddr,
_auth: Option<(String, String)>,
_logger: Arc<RotatingLogger>,
) -> ! {
die!("Your Parity version has been compiled without WebApps support.")
}
fn print_version() {
println!("\
Parity
@ -644,66 +592,33 @@ impl Configuration {
let sync = EthSync::register(service.network(), sync_config, client.clone(), miner.clone());
// Setup rpc
let rpc_server = if self.args.flag_jsonrpc || self.args.flag_rpc {
let apis = self.args.flag_rpcapi.as_ref().unwrap_or(&self.args.flag_jsonrpc_apis);
let url = format!("{}:{}",
match self.args.flag_rpcaddr.as_ref().unwrap_or(&self.args.flag_jsonrpc_interface).as_str() {
"all" => "0.0.0.0",
"local" => "127.0.0.1",
x => x,
},
self.args.flag_rpcport.unwrap_or(self.args.flag_jsonrpc_port)
);
let addr = SocketAddr::from_str(&url).unwrap_or_else(|_| die!("{}: Invalid JSONRPC listen host/port given.", url));
let cors_domain = self.args.flag_jsonrpc_cors.clone().or(self.args.flag_rpccorsdomain.clone());
let rpc_server = rpc::new(rpc::Configuration {
enabled: self.args.flag_jsonrpc || self.args.flag_rpc,
interface: self.args.flag_rpcaddr.clone().unwrap_or(self.args.flag_jsonrpc_interface.clone()),
port: self.args.flag_rpcport.unwrap_or(self.args.flag_jsonrpc_port),
apis: self.args.flag_rpcapi.clone().unwrap_or(self.args.flag_jsonrpc_apis.clone()),
cors: self.args.flag_jsonrpc_cors.clone().or(self.args.flag_rpccorsdomain.clone()),
}, rpc::Dependencies {
client: client.clone(),
sync: sync.clone(),
secret_store: account_service.clone(),
miner: miner.clone(),
logger: logger.clone()
});
Some(rpc::setup_rpc_server(
service.client(),
sync.clone(),
account_service.clone(),
miner.clone(),
&addr,
cors_domain,
apis.split(',').collect(),
logger.clone(),
))
} else {
None
};
let webapp_server = if self.args.flag_webapp {
let url = format!("{}:{}",
match self.args.flag_webapp_interface.as_str() {
"all" => "0.0.0.0",
"local" => "127.0.0.1",
x => x,
},
self.args.flag_webapp_port
);
let addr = SocketAddr::from_str(&url).unwrap_or_else(|_| die!("{}: Invalid Webapps listen host/port given.", url));
let auth = self.args.flag_webapp_user.as_ref().map(|username| {
let password = self.args.flag_webapp_pass.as_ref().map_or_else(|| {
use rpassword::read_password;
println!("Type password for WebApps server (user: {}): ", username);
let pass = read_password().unwrap();
println!("OK, got it. Starting server...");
pass
}, |pass| pass.to_owned());
(username.to_owned(), password)
});
Some(setup_webapp_server(
service.client(),
sync.clone(),
account_service.clone(),
miner.clone(),
&addr,
auth,
logger.clone(),
))
} else {
None
};
let webapp_server = webapp::new(webapp::Configuration {
enabled: self.args.flag_webapp,
interface: self.args.flag_webapp_interface.clone(),
port: self.args.flag_webapp_port,
user: self.args.flag_webapp_user.clone(),
pass: self.args.flag_webapp_pass.clone(),
}, webapp::Dependencies {
client: client.clone(),
sync: sync.clone(),
secret_store: account_service.clone(),
miner: miner.clone(),
logger: logger.clone()
});
// Register IO handler
let io_handler = Arc::new(ClientIoHandler {
@ -740,101 +655,6 @@ fn main() {
Configuration::parse().execute();
}
struct Informant {
chain_info: RwLock<Option<BlockChainInfo>>,
cache_info: RwLock<Option<BlockChainCacheSize>>,
report: RwLock<Option<ClientReport>>,
}
impl Default for Informant {
fn default() -> Self {
Informant {
chain_info: RwLock::new(None),
cache_info: RwLock::new(None),
report: RwLock::new(None),
}
}
}
impl Informant {
fn format_bytes(b: usize) -> String {
match binary_prefix(b as f64) {
Standalone(bytes) => format!("{} bytes", bytes),
Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix),
}
}
pub fn tick(&self, client: &Client, sync: &EthSync) {
// 5 seconds betwen calls. TODO: calculate this properly.
let dur = 5usize;
let chain_info = client.chain_info();
let queue_info = client.queue_info();
let cache_info = client.blockchain_cache_info();
let sync_info = sync.status();
let mut write_report = self.report.write().unwrap();
let report = client.report();
if let (_, _, &Some(ref last_report)) = (
self.chain_info.read().unwrap().deref(),
self.cache_info.read().unwrap().deref(),
write_report.deref()
) {
println!("[ #{} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, #{}, {}+{} queued ···// mem: {} db, {} chain, {} queue, {} sync ]",
chain_info.best_block_number,
chain_info.best_block_hash,
(report.blocks_imported - last_report.blocks_imported) / dur,
(report.transactions_applied - last_report.transactions_applied) / dur,
(report.gas_processed - last_report.gas_processed) / From::from(dur),
sync_info.num_active_peers,
sync_info.num_peers,
sync_info.last_imported_block_number.unwrap_or(chain_info.best_block_number),
queue_info.unverified_queue_size,
queue_info.verified_queue_size,
Informant::format_bytes(report.state_db_mem),
Informant::format_bytes(cache_info.total()),
Informant::format_bytes(queue_info.mem_used),
Informant::format_bytes(sync_info.mem_used),
);
}
*self.chain_info.write().unwrap().deref_mut() = Some(chain_info);
*self.cache_info.write().unwrap().deref_mut() = Some(cache_info);
*write_report.deref_mut() = Some(report);
}
}
const INFO_TIMER: TimerToken = 0;
const ACCOUNT_TICK_TIMER: TimerToken = 10;
const ACCOUNT_TICK_MS: u64 = 60000;
struct ClientIoHandler {
client: Arc<Client>,
sync: Arc<EthSync>,
accounts: Arc<AccountService>,
info: Informant,
}
impl IoHandler<NetSyncMessage> for ClientIoHandler {
fn initialize(&self, io: &IoContext<NetSyncMessage>) {
io.register_timer(INFO_TIMER, 5000).expect("Error registering timer");
io.register_timer(ACCOUNT_TICK_TIMER, ACCOUNT_TICK_MS).expect("Error registering account timer");
}
fn timeout(&self, _io: &IoContext<NetSyncMessage>, timer: TimerToken) {
match timer {
INFO_TIMER => { self.info.tick(&self.client, &self.sync); }
ACCOUNT_TICK_TIMER => { self.accounts.tick(); },
_ => {}
}
}
}
/// Parity needs at least 1 test to generate coverage reports correctly.
#[test]
fn if_works() {

View File

@ -15,6 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::str::FromStr;
use std::sync::Arc;
use std::net::SocketAddr;
use ethcore::client::Client;
@ -28,34 +29,58 @@ use die::*;
pub use ethcore_rpc::Server as RpcServer;
#[cfg(feature = "rpc")]
use ethcore_rpc::{RpcServerError, RpcServer as Server};
#[cfg(not(feature = "rpc"))]
pub struct RpcServer;
pub struct Configuration {
pub enabled: bool,
pub interface: String,
pub port: u16,
pub apis: String,
pub cors: Option<String>,
}
pub struct Dependencies {
pub client: Arc<Client>,
pub sync: Arc<EthSync>,
pub secret_store: Arc<AccountService>,
pub miner: Arc<Miner>,
pub logger: Arc<RotatingLogger>,
}
pub fn new(conf: Configuration, deps: Dependencies) -> Option<RpcServer> {
if !conf.enabled {
return None;
}
let interface = match conf.interface.as_str() {
"all" => "0.0.0.0",
"local" => "127.0.0.1",
x => x,
};
let apis = conf.apis.split(',').collect();
let url = format!("{}:{}", interface, conf.port);
let addr = SocketAddr::from_str(&url).unwrap_or_else(|_| die!("{}: Invalid JSONRPC listen host/port given.", url));
Some(setup_rpc_server(deps, &addr, conf.cors, apis))
}
#[cfg(not(feature = "rpc"))]
pub fn setup_rpc_server(
_client: Arc<Client>,
_sync: Arc<EthSync>,
_secret_store: Arc<AccountService>,
_miner: Arc<Miner>,
_deps: Dependencies,
_url: &SocketAddr,
_cors_domain: Option<String>,
_apis: Vec<&str>,
_logger: Arc<RotatingLogger>,
) -> ! {
die!("Your Parity version has been compiled without JSON-RPC support.")
}
#[cfg(feature = "rpc")]
pub fn setup_rpc_server(
client: Arc<Client>,
sync: Arc<EthSync>,
secret_store: Arc<AccountService>,
miner: Arc<Miner>,
deps: Dependencies,
url: &SocketAddr,
cors_domain: Option<String>,
apis: Vec<&str>,
logger: Arc<RotatingLogger>,
) -> RpcServer {
use ethcore_rpc::v1::*;
@ -63,13 +88,13 @@ pub fn setup_rpc_server(
for api in apis.into_iter() {
match api {
"web3" => server.add_delegate(Web3Client::new().to_delegate()),
"net" => server.add_delegate(NetClient::new(&sync).to_delegate()),
"net" => server.add_delegate(NetClient::new(&deps.sync).to_delegate()),
"eth" => {
server.add_delegate(EthClient::new(&client, &sync, &secret_store, &miner).to_delegate());
server.add_delegate(EthFilterClient::new(&client, &miner).to_delegate());
server.add_delegate(EthClient::new(&deps.client, &deps.sync, &deps.secret_store, &deps.miner).to_delegate());
server.add_delegate(EthFilterClient::new(&deps.client, &deps.miner).to_delegate());
},
"personal" => server.add_delegate(PersonalClient::new(&secret_store).to_delegate()),
"ethcore" => server.add_delegate(EthcoreClient::new(&miner, logger.clone()).to_delegate()),
"personal" => server.add_delegate(PersonalClient::new(&deps.secret_store).to_delegate()),
"ethcore" => server.add_delegate(EthcoreClient::new(&deps.miner, deps.logger.clone()).to_delegate()),
_ => {
die!("{}: Invalid API name to be enabled.", api);
},

117
parity/webapp.rs Normal file
View File

@ -0,0 +1,117 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::str::FromStr;
use std::net::SocketAddr;
use ethcore::client::Client;
use ethsync::EthSync;
use ethminer::Miner;
use util::RotatingLogger;
use util::keys::store::{AccountService};
use die::*;
#[cfg(feature = "webapp")]
pub use ethcore_webapp::Server as WebappServer;
#[cfg(not(feature = "webapp"))]
pub struct WebappServer;
pub struct Configuration {
pub enabled: bool,
pub interface: String,
pub port: u16,
pub user: Option<String>,
pub pass: Option<String>,
}
pub struct Dependencies {
pub client: Arc<Client>,
pub sync: Arc<EthSync>,
pub secret_store: Arc<AccountService>,
pub miner: Arc<Miner>,
pub logger: Arc<RotatingLogger>,
}
pub fn new(configuration: Configuration, deps: Dependencies) -> Option<WebappServer> {
if !configuration.enabled {
return None;
}
let interface = match configuration.interface.as_str() {
"all" => "0.0.0.0",
"local" => "127.0.0.1",
x => x,
};
let url = format!("{}:{}", interface, configuration.port);
let addr = SocketAddr::from_str(&url).unwrap_or_else(|_| die!("{}: Invalid Webapps listen host/port given.", url));
let auth = configuration.user.as_ref().map(|username| {
let password = configuration.pass.as_ref().map_or_else(|| {
use rpassword::read_password;
println!("Type password for WebApps server (user: {}): ", username);
let pass = read_password().unwrap();
println!("OK, got it. Starting server...");
pass
}, |pass| pass.to_owned());
(username.to_owned(), password)
});
Some(setup_webapp_server(deps, &addr, auth))
}
#[cfg(not(feature = "webapp"))]
pub fn setup_webapp_server(
_deps: Dependencies,
_url: &SocketAddr,
_auth: Option<(String, String)>,
) -> ! {
die!("Your Parity version has been compiled without WebApps support.")
}
#[cfg(feature = "webapp")]
pub fn setup_webapp_server(
deps: Dependencies,
url: &SocketAddr,
auth: Option<(String, String)>
) -> WebappServer {
use ethcore_rpc::v1::*;
use ethcore_webapp as webapp;
let server = webapp::ServerBuilder::new();
server.add_delegate(Web3Client::new().to_delegate());
server.add_delegate(NetClient::new(&deps.sync).to_delegate());
server.add_delegate(EthClient::new(&deps.client, &deps.sync, &deps.secret_store, &deps.miner).to_delegate());
server.add_delegate(EthFilterClient::new(&deps.client, &deps.miner).to_delegate());
server.add_delegate(PersonalClient::new(&deps.secret_store).to_delegate());
server.add_delegate(EthcoreClient::new(&deps.miner, deps.logger).to_delegate());
let start_result = match auth {
None => {
server.start_unsecure_http(url)
},
Some((username, password)) => {
server.start_basic_auth_http(url, &username, &password)
},
};
match start_result {
Err(webapp::ServerError::IoError(err)) => die_with_io_error(err),
Err(e) => die!("{:?}", e),
Ok(handle) => handle,
}
}