mild abstraction of RPC dependencies

This commit is contained in:
Robert Habermeier 2017-03-22 20:14:40 +01:00
parent e3d6525d83
commit 35d9a9815e
5 changed files with 177 additions and 142 deletions

View File

@ -59,7 +59,7 @@ impl Default for Configuration {
}
pub struct Dependencies {
pub apis: Arc<rpc_apis::Dependencies>,
pub apis: Arc<rpc_apis::FullDependencies>,
pub client: Arc<Client>,
pub sync: Arc<SyncProvider>,
pub remote: parity_reactor::TokioRemote,
@ -182,7 +182,7 @@ mod server {
} else {
rpc_apis::ApiSet::UnsafeContext
};
let apis = rpc_apis::setup_rpc(deps.stats, deps.apis.clone(), api_set);
let apis = rpc_apis::setup_rpc(deps.stats, &*deps.apis, api_set);
let start_result = match auth {
None => {
server.start_unsecured_http(url, apis, deps.remote)

View File

@ -82,8 +82,8 @@ impl fmt::Display for IpcConfiguration {
}
}
pub struct Dependencies {
pub apis: Arc<rpc_apis::Dependencies>,
pub struct Dependencies<D: rpc_apis::Dependencies> {
pub apis: Arc<D>,
pub remote: TokioRemote,
pub stats: Arc<RpcStats>,
}
@ -109,7 +109,9 @@ impl rpc::IpcMetaExtractor<Metadata> for RpcExtractor {
}
}
pub fn new_http(conf: HttpConfiguration, deps: &Dependencies) -> Result<Option<HttpServer>, String> {
pub fn new_http<D>(conf: HttpConfiguration, deps: &Dependencies<D>) -> Result<Option<HttpServer>, String>
where D: rpc_apis::Dependencies
{
if !conf.enabled {
return Ok(None);
}
@ -119,12 +121,14 @@ pub fn new_http(conf: HttpConfiguration, deps: &Dependencies) -> Result<Option<H
Ok(Some(setup_http_rpc_server(deps, &addr, conf.cors, conf.hosts, conf.apis)?))
}
fn setup_apis(apis: ApiSet, deps: &Dependencies) -> MetaIoHandler<Metadata, Middleware> {
rpc_apis::setup_rpc(deps.stats.clone(), deps.apis.clone(), apis)
fn setup_apis<D>(apis: ApiSet, deps: &Dependencies<D>) -> MetaIoHandler<Metadata, Middleware<D::Notifier>>
where D: rpc_apis::Dependencies
{
rpc_apis::setup_rpc(deps.stats.clone(), &*deps.apis, apis)
}
pub fn setup_http_rpc_server(
dependencies: &Dependencies,
pub fn setup_http_rpc_server<D: rpc_apis::Dependencies>(
dependencies: &Dependencies<D>,
url: &SocketAddr,
cors_domains: Option<Vec<String>>,
allowed_hosts: Option<Vec<String>>,
@ -145,12 +149,12 @@ pub fn setup_http_rpc_server(
}
}
pub fn new_ipc(conf: IpcConfiguration, deps: &Dependencies) -> Result<Option<IpcServer>, String> {
pub fn new_ipc<D: rpc_apis::Dependencies>(conf: IpcConfiguration, deps: &Dependencies<D>) -> Result<Option<IpcServer>, String> {
if !conf.enabled { return Ok(None); }
Ok(Some(setup_ipc_rpc_server(deps, &conf.socket_addr, conf.apis)?))
}
pub fn setup_ipc_rpc_server(dependencies: &Dependencies, addr: &str, apis: ApiSet) -> Result<IpcServer, String> {
pub fn setup_ipc_rpc_server<D: rpc_apis::Dependencies>(dependencies: &Dependencies<D>, addr: &str, apis: ApiSet) -> Result<IpcServer, String> {
let handler = setup_apis(apis, dependencies);
let remote = dependencies.remote.clone();
match rpc::start_ipc(addr, handler, remote, RpcExtractor) {

View File

@ -27,7 +27,7 @@ use ethcore::client::Client;
use ethcore::miner::{Miner, ExternalMiner};
use ethcore::snapshot::SnapshotService;
use ethcore_rpc::{Metadata, NetworkSettings};
use ethcore_rpc::informant::{Middleware, RpcStats, ClientNotifier};
use ethcore_rpc::informant::{ActivityNotifier, Middleware, RpcStats, ClientNotifier};
use ethcore_rpc::dispatch::FullDispatcher;
use ethsync::{ManageNetwork, SyncProvider};
use hash_fetch::fetch::Client as FetchClient;
@ -112,25 +112,6 @@ impl FromStr for ApiSet {
}
}
pub struct Dependencies {
pub signer_service: Arc<SignerService>,
pub client: Arc<Client>,
pub snapshot: Arc<SnapshotService>,
pub sync: Arc<SyncProvider>,
pub net: Arc<ManageNetwork>,
pub secret_store: Arc<AccountProvider>,
pub miner: Arc<Miner>,
pub external_miner: Arc<ExternalMiner>,
pub logger: Arc<RotatingLogger>,
pub settings: Arc<NetworkSettings>,
pub net_service: Arc<ManageNetwork>,
pub updater: Arc<Updater>,
pub geth_compatibility: bool,
pub dapps_interface: Option<String>,
pub dapps_port: Option<u16>,
pub fetch: FetchClient,
}
fn to_modules(apis: &[Api]) -> BTreeMap<String, String> {
let mut modules = BTreeMap::new();
for api in apis {
@ -151,6 +132,145 @@ fn to_modules(apis: &[Api]) -> BTreeMap<String, String> {
modules
}
/// RPC dependencies can be used to initialize RPC endpoints from APIs.
pub trait Dependencies {
type Notifier: ActivityNotifier;
/// Create the activity notifier.
fn activity_notifier(&self) -> Self::Notifier;
/// Extend the given I/O handler with endpoints for each API.
fn extend_with_set(&self, handler: &mut MetaIoHandler<Metadata, Middleware<Self::Notifier>>, apis: &[Api]);
}
/// RPC dependencies for a full node.
pub struct FullDependencies {
pub signer_service: Arc<SignerService>,
pub client: Arc<Client>,
pub snapshot: Arc<SnapshotService>,
pub sync: Arc<SyncProvider>,
pub net: Arc<ManageNetwork>,
pub secret_store: Arc<AccountProvider>,
pub miner: Arc<Miner>,
pub external_miner: Arc<ExternalMiner>,
pub logger: Arc<RotatingLogger>,
pub settings: Arc<NetworkSettings>,
pub net_service: Arc<ManageNetwork>,
pub updater: Arc<Updater>,
pub geth_compatibility: bool,
pub dapps_interface: Option<String>,
pub dapps_port: Option<u16>,
pub fetch: FetchClient,
}
impl Dependencies for FullDependencies {
type Notifier = ClientNotifier;
fn activity_notifier(&self) -> ClientNotifier {
ClientNotifier {
client: self.client.clone(),
}
}
fn extend_with_set(&self, handler: &mut MetaIoHandler<Metadata, Middleware>, apis: &[Api]) {
use ethcore_rpc::v1::*;
macro_rules! add_signing_methods {
($namespace:ident, $handler:expr, $deps:expr) => {
{
let deps = &$deps;
let dispatcher = FullDispatcher::new(Arc::downgrade(&deps.client), Arc::downgrade(&deps.miner));
if deps.signer_service.is_enabled() {
$handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, &deps.secret_store)))
} else {
$handler.extend_with($namespace::to_delegate(SigningUnsafeClient::new(&deps.secret_store, dispatcher)))
}
}
}
}
let dispatcher = FullDispatcher::new(Arc::downgrade(&self.client), Arc::downgrade(&self.miner));
for api in apis {
match *api {
Api::Web3 => {
handler.extend_with(Web3Client::new().to_delegate());
},
Api::Net => {
handler.extend_with(NetClient::new(&self.sync).to_delegate());
},
Api::Eth => {
let client = EthClient::new(
&self.client,
&self.snapshot,
&self.sync,
&self.secret_store,
&self.miner,
&self.external_miner,
EthClientOptions {
pending_nonce_from_queue: self.geth_compatibility,
allow_pending_receipt_query: !self.geth_compatibility,
send_block_number_in_get_work: !self.geth_compatibility,
}
);
handler.extend_with(client.to_delegate());
let filter_client = EthFilterClient::new(&self.client, &self.miner);
handler.extend_with(filter_client.to_delegate());
add_signing_methods!(EthSigning, handler, self);
},
Api::Personal => {
handler.extend_with(PersonalClient::new(&self.secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate());
},
Api::Signer => {
handler.extend_with(SignerClient::new(&self.secret_store, dispatcher.clone(), &self.signer_service).to_delegate());
},
Api::Parity => {
let signer = match self.signer_service.is_enabled() {
true => Some(self.signer_service.clone()),
false => None,
};
handler.extend_with(ParityClient::new(
&self.client,
&self.miner,
&self.sync,
&self.updater,
&self.net_service,
&self.secret_store,
self.logger.clone(),
self.settings.clone(),
signer,
self.dapps_interface.clone(),
self.dapps_port,
).to_delegate());
add_signing_methods!(EthSigning, handler, self);
add_signing_methods!(ParitySigning, handler, self);
},
Api::ParityAccounts => {
handler.extend_with(ParityAccountsClient::new(&self.secret_store).to_delegate());
},
Api::ParitySet => {
handler.extend_with(ParitySetClient::new(
&self.client,
&self.miner,
&self.updater,
&self.net_service,
self.fetch.clone(),
).to_delegate())
},
Api::Traces => {
handler.extend_with(TracesClient::new(&self.client, &self.miner).to_delegate())
},
Api::Rpc => {
let modules = to_modules(&apis);
handler.extend_with(RpcClient::new(modules).to_delegate());
}
}
}
}
}
impl ApiSet {
pub fn list_apis(&self) -> HashSet<Api> {
let mut safe_list = vec![Api::Web3, Api::Net, Api::Eth, Api::Parity, Api::Traces, Api::Rpc]
@ -172,110 +292,12 @@ impl ApiSet {
}
}
macro_rules! add_signing_methods {
($namespace:ident, $handler:expr, $deps:expr) => {
{
let handler = &mut $handler;
let deps = &$deps;
let dispatcher = FullDispatcher::new(Arc::downgrade(&deps.client), Arc::downgrade(&deps.miner));
if deps.signer_service.is_enabled() {
handler.extend_with($namespace::to_delegate(SigningQueueClient::new(&deps.signer_service, dispatcher, &deps.secret_store)))
} else {
handler.extend_with($namespace::to_delegate(SigningUnsafeClient::new(&deps.secret_store, dispatcher)))
}
}
}
}
pub fn setup_rpc(stats: Arc<RpcStats>, deps: Arc<Dependencies>, apis: ApiSet) -> MetaIoHandler<Metadata, Middleware> {
use ethcore_rpc::v1::*;
let mut handler = MetaIoHandler::with_middleware(Middleware::new(stats, ClientNotifier {
client: deps.client.clone(),
}));
pub fn setup_rpc<D: Dependencies>(stats: Arc<RpcStats>, deps: &D, apis: ApiSet) -> MetaIoHandler<Metadata, Middleware<D::Notifier>> {
let mut handler = MetaIoHandler::with_middleware(Middleware::new(stats, deps.activity_notifier()));
// it's turned into vector, cause ont of the cases requires &[]
let apis = apis.list_apis().into_iter().collect::<Vec<_>>();
let dispatcher = FullDispatcher::new(Arc::downgrade(&deps.client), Arc::downgrade(&deps.miner));
deps.extend_with_set(&mut handler, &apis[..]);
for api in &apis {
match *api {
Api::Web3 => {
handler.extend_with(Web3Client::new().to_delegate());
},
Api::Net => {
handler.extend_with(NetClient::new(&deps.sync).to_delegate());
},
Api::Eth => {
let client = EthClient::new(
&deps.client,
&deps.snapshot,
&deps.sync,
&deps.secret_store,
&deps.miner,
&deps.external_miner,
EthClientOptions {
pending_nonce_from_queue: deps.geth_compatibility,
allow_pending_receipt_query: !deps.geth_compatibility,
send_block_number_in_get_work: !deps.geth_compatibility,
}
);
handler.extend_with(client.to_delegate());
let filter_client = EthFilterClient::new(&deps.client, &deps.miner);
handler.extend_with(filter_client.to_delegate());
add_signing_methods!(EthSigning, handler, deps);
},
Api::Personal => {
handler.extend_with(PersonalClient::new(&deps.secret_store, dispatcher.clone(), deps.geth_compatibility).to_delegate());
},
Api::Signer => {
handler.extend_with(SignerClient::new(&deps.secret_store, dispatcher.clone(), &deps.signer_service).to_delegate());
},
Api::Parity => {
let signer = match deps.signer_service.is_enabled() {
true => Some(deps.signer_service.clone()),
false => None,
};
handler.extend_with(ParityClient::new(
&deps.client,
&deps.miner,
&deps.sync,
&deps.updater,
&deps.net_service,
&deps.secret_store,
deps.logger.clone(),
deps.settings.clone(),
signer,
deps.dapps_interface.clone(),
deps.dapps_port,
).to_delegate());
add_signing_methods!(EthSigning, handler, deps);
add_signing_methods!(ParitySigning, handler, deps);
},
Api::ParityAccounts => {
handler.extend_with(ParityAccountsClient::new(&deps.secret_store).to_delegate());
},
Api::ParitySet => {
handler.extend_with(ParitySetClient::new(
&deps.client,
&deps.miner,
&deps.updater,
&deps.net_service,
deps.fetch.clone(),
).to_delegate())
},
Api::Traces => {
handler.extend_with(TracesClient::new(&deps.client, &deps.miner).to_delegate())
},
Api::Rpc => {
let modules = to_modules(&apis);
handler.extend_with(RpcClient::new(modules).to_delegate());
}
}
}
handler
}

View File

@ -498,7 +498,7 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
// set up dependencies for rpc servers
let rpc_stats = Arc::new(informant::RpcStats::default());
let signer_path = cmd.signer_conf.signer_path.clone();
let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies {
let deps_for_rpc_apis = Arc::new(rpc_apis::FullDependencies {
signer_service: Arc::new(rpc_apis::SignerService::new(move || {
signer::generate_new_token(signer_path.clone()).map_err(|e| format!("{:?}", e))
}, cmd.ui_address)),
@ -553,7 +553,8 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
remote: event_loop.raw_remote(),
rpc_stats: rpc_stats.clone(),
};
let signer_server = signer::start(cmd.signer_conf.clone(), signer_deps)?;
let signing_queue = deps_for_rpc_apis.signer_service.queue();
let signer_server = signer::start(cmd.signer_conf.clone(), signing_queue, signer_deps)?;
// secret store key server
let secretstore_deps = secretstore::Dependencies { };

View File

@ -23,7 +23,7 @@ pub use ethcore_signer::Server as SignerServer;
use ansi_term::Colour;
use dir::default_data_path;
use ethcore_rpc::informant::RpcStats;
use ethcore_rpc;
use ethcore_rpc::{self, ConfirmationsQueue};
use ethcore_signer as signer;
use helpers::replace_home;
use parity_reactor::TokioRemote;
@ -55,8 +55,8 @@ impl Default for Configuration {
}
}
pub struct Dependencies {
pub apis: Arc<rpc_apis::Dependencies>,
pub struct Dependencies<D: rpc_apis::Dependencies> {
pub apis: Arc<D>,
pub remote: TokioRemote,
pub rpc_stats: Arc<RpcStats>,
}
@ -77,11 +77,15 @@ impl signer::MetaExtractor<ethcore_rpc::Metadata> for StandardExtractor {
}
}
pub fn start(conf: Configuration, deps: Dependencies) -> Result<Option<SignerServer>, String> {
pub fn start<D: rpc_apis::Dependencies>(
conf: Configuration,
queue: Arc<ConfirmationsQueue>,
deps: Dependencies<D>,
) -> Result<Option<SignerServer>, String> {
if !conf.enabled {
Ok(None)
} else {
Ok(Some(do_start(conf, deps)?))
Ok(Some(do_start(conf, queue, deps)?))
}
}
@ -125,14 +129,18 @@ pub fn generate_new_token(path: String) -> io::Result<String> {
Ok(code)
}
fn do_start(conf: Configuration, deps: Dependencies) -> Result<SignerServer, String> {
fn do_start<D: rpc_apis::Dependencies>(
conf: Configuration,
queue: Arc<ConfirmationsQueue>,
deps: Dependencies<D>
) -> Result<SignerServer, String> {
let addr = format!("{}:{}", conf.interface, conf.port)
.parse()
.map_err(|_| format!("Invalid port specified: {}", conf.port))?;
let start_result = {
let server = signer::ServerBuilder::new(
deps.apis.signer_service.queue(),
queue,
codes_path(conf.signer_path),
);
if conf.skip_origin_validation {
@ -141,7 +149,7 @@ fn do_start(conf: Configuration, deps: Dependencies) -> Result<SignerServer, Str
}
let server = server.skip_origin_validation(conf.skip_origin_validation);
let server = server.stats(deps.rpc_stats.clone());
let handler = rpc_apis::setup_rpc(deps.rpc_stats, deps.apis, rpc_apis::ApiSet::SafeContext);
let handler = rpc_apis::setup_rpc(deps.rpc_stats, &*deps.apis, rpc_apis::ApiSet::SafeContext);
let remote = deps.remote.clone();
server.start_with_extractor(addr, handler, remote, StandardExtractor)
};