From 507a4ea26c341875c15192ddacc93d4762589bb9 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Fri, 15 Jul 2016 15:32:29 +0200 Subject: [PATCH] basic layout of sync executable and minor fixes in the api --- Cargo.lock | 9 ++ Cargo.toml | 5 ++ ipc/hypervisor/src/lib.rs | 4 +- ipc/hypervisor/src/service.rs.in | 3 + ipc/nano/src/lib.rs | 4 +- parity/sync/main.rs | 137 +++++++++++++++++++++++++++++++ sync/src/api.rs | 6 +- 7 files changed, 162 insertions(+), 6 deletions(-) create mode 100644 parity/sync/main.rs diff --git a/Cargo.lock b/Cargo.lock index ea6f0438d..26a7946c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,6 +30,7 @@ dependencies = [ "rpassword 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "schedule_recv 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "syntex 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1129,6 +1130,14 @@ dependencies = [ "semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "schedule_recv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "lazy_static 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "semver" version = "0.1.20" diff --git a/Cargo.toml b/Cargo.toml index 77ea5e9c9..9761a2147 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ ethcore-ipc-hypervisor = { path = "ipc/hypervisor" } json-ipc-server = { git = "https://github.com/ethcore/json-ipc-server.git" } ethcore-dapps = { path = "dapps", optional = true } clippy = { version = "0.0.79", optional = true} +schedule_recv = "0.1" [target.'cfg(windows)'.dependencies] winapi = "0.2" @@ -61,6 +62,10 @@ travis-nightly = ["ethcore/json-tests", "dev"] path = "parity/main.rs" name = "parity" +[[bin]] +path = "parity/sync/main.rs" +name = "sync" + [profile.release] debug = true lto = false diff --git a/ipc/hypervisor/src/lib.rs b/ipc/hypervisor/src/lib.rs index 69bbbffae..f67996189 100644 --- a/ipc/hypervisor/src/lib.rs +++ b/ipc/hypervisor/src/lib.rs @@ -33,11 +33,12 @@ use service::{HypervisorService, IpcModuleId}; use std::process::{Command,Child}; use std::collections::HashMap; -pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID}; +pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID, SYNC_MODULE_ID}; type BinaryId = &'static str; const CLIENT_BINARY: BinaryId = "client"; +const SYNC_BINARY: BinaryId = "sync"; pub struct Hypervisor { ipc_addr: String, @@ -76,6 +77,7 @@ impl Hypervisor { fn match_module(module_id: &IpcModuleId) -> Option { match *module_id { CLIENT_MODULE_ID => Some(CLIENT_BINARY), + SYNC_MODULE_ID => Some(SYNC_BINARY), // none means the module is inside the main binary _ => None } diff --git a/ipc/hypervisor/src/service.rs.in b/ipc/hypervisor/src/service.rs.in index 1e569f5bd..9fcb09f2b 100644 --- a/ipc/hypervisor/src/service.rs.in +++ b/ipc/hypervisor/src/service.rs.in @@ -26,6 +26,9 @@ pub type IpcModuleId = u64; /// Blockhain database module id pub const CLIENT_MODULE_ID: IpcModuleId = 2000; +/// Sync module id +pub const SYNC_MODULE_ID: IpcModuleId = 2100; + /// IPC service that handles module management pub struct HypervisorService { check_list: RwLock>, diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 9262b3fe8..f835566ed 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -54,9 +54,9 @@ impl GuardedSocket where S: WithSocket { } impl Deref for GuardedSocket where S: WithSocket { - type Target = S; + type Target = Arc; - fn deref(&self) -> &S { + fn deref(&self) -> &Arc { &self.client } } diff --git a/parity/sync/main.rs b/parity/sync/main.rs new file mode 100644 index 000000000..2990dc24b --- /dev/null +++ b/parity/sync/main.rs @@ -0,0 +1,137 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Parity database ipc service + +extern crate ethcore_ipc_nano as nanoipc; +extern crate ethcore_ipc_hypervisor as hypervisor; +extern crate ethcore_ipc as ipc; +extern crate ctrlc; +#[macro_use] extern crate log; +extern crate ethsync; +extern crate rustc_serialize; +extern crate docopt; +extern crate ethcore; +extern crate ethcore_util as util; + +use std::sync::Arc; +use hypervisor::{HypervisorServiceClient, CLIENT_MODULE_ID, SYNC_MODULE_ID, HYPERVISOR_IPC_URL}; +use ctrlc::CtrlC; +use std::sync::atomic::*; +use docopt::Docopt; +use ethcore::client::{BlockChainClient, RemoteClient}; +use nanoipc::*; +use ethsync::{SyncProvider, SyncConfig, EthSync, ManageNetwork, NetworkConfiguration}; +use std::thread; +use util::numbers::*; +use std::str::FromStr; + +const USAGE: &'static str = " +Ethcore sync service +Usage: + sync [options] + +Options: + --public-address IP Public address. + --boot-nodes LIST List of boot nodes. + --reserved-nodes LIST List of reserved peers, + --secret HEX Use node key hash + --udp-port UDP port +"; + +#[derive(Debug, RustcDecodable)] +struct Args { + arg_network_id: String, + arg_listen_address: String, + arg_nat_enabled: bool, + arg_discovery_enabled: bool, + arg_ideal_peers: u32, + arg_config_path: String, + arg_client_url: String, + arg_allow_non_reserved: bool, + flag_public_address: Option, + flag_secret: Option, + flag_boot_nodes: Vec, + flag_reserved_nodes: Vec, + flag_udp_port: Option, +} + +impl Args { + pub fn into_config(self) -> (SyncConfig, NetworkConfiguration, String) { + let mut sync_config = SyncConfig::default(); + sync_config.network_id = U256::from_str(&self.arg_network_id).unwrap(); + + let mut network_config = NetworkConfiguration { + udp_port: self.flag_udp_port, + nat_enabled: self.arg_nat_enabled, + boot_nodes: self.flag_boot_nodes, + listen_address: Some(self.arg_listen_address), + public_address: self.flag_public_address, + use_secret: self.flag_secret.as_ref().map(|s| H256::from_str(s).unwrap()), + discovery_enabled: self.arg_discovery_enabled, + ideal_peers: self.arg_ideal_peers, + config_path: Some(self.arg_config_path), + reserved_nodes: self.flag_reserved_nodes, + allow_non_reserved: self.arg_allow_non_reserved, + }; + + (sync_config, network_config, self.arg_client_url) + } +} + +fn run_service(addr: &str, stop_guard: Arc, service: &Arc) where Arc: IpcInterface { + let socket_url = addr.to_owned(); + let service_spawn = service.clone(); + std::thread::spawn(move || { + let mut worker = nanoipc::Worker::::new(&service_spawn); + worker.add_reqrep(&socket_url).unwrap(); + + while !stop_guard.load(Ordering::Relaxed) { + worker.poll(); + } + }); +} + +fn main() { + use std::ops::Deref; + + let args: Args = Docopt::new(USAGE) + .and_then(|d| d.decode()) + .unwrap_or_else(|e| e.exit()); + let (sync_config, network_config, client_url) = args.into_config(); + let remote_client = nanoipc::init_client::>(&client_url).unwrap(); + + remote_client.handshake().unwrap(); + + let stop = Arc::new(AtomicBool::new(false)); + let sync = EthSync::new(sync_config, remote_client.clone(), network_config).unwrap(); + + run_service("ipc:///tmp/parity-sync.ipc", stop.clone(), &(sync.clone() as Arc)); + run_service("ipc:///tmp/parity-manage-net.ipc", stop.clone(), &(sync.clone() as Arc)); + + let hypervisor_client = nanoipc::init_client::>(HYPERVISOR_IPC_URL).unwrap(); + hypervisor_client.handshake().unwrap(); + hypervisor_client.module_ready(SYNC_MODULE_ID); + + let terminate_stop = stop.clone(); + CtrlC::set_handler(move || { + terminate_stop.store(true, Ordering::Relaxed); + }); + + while !stop.load(Ordering::Relaxed) { + thread::park_timeout(std::time::Duration::from_millis(1000)); + } +} diff --git a/sync/src/api.rs b/sync/src/api.rs index db2ce031a..4c43d17fc 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode}; use util::{TimerToken, U256, H256, UtilError, Secret, Populatable}; -use ethcore::client::{Client, ChainNotify}; +use ethcore::client::{Client, BlockChainClient, ChainNotify}; use io::NetSyncIo; use chain::{ChainSync, SyncStatus}; use std::net::{SocketAddr, AddrParseError}; @@ -67,7 +67,7 @@ pub struct EthSync { impl EthSync { /// Creates and register protocol with the network service - pub fn new(config: SyncConfig, chain: Arc, network_config: NetworkConfiguration) -> Result, UtilError> { + pub fn new(config: SyncConfig, chain: Arc, network_config: NetworkConfiguration) -> Result, UtilError> { let chain_sync = ChainSync::new(config, chain.deref()); let service = try!(NetworkService::new(try!(network_config.into_basic()))); let sync = Arc::new(EthSync{ @@ -90,7 +90,7 @@ impl SyncProvider for EthSync { struct SyncProtocolHandler { /// Shared blockchain client. TODO: this should evetually become an IPC endpoint - chain: Arc, + chain: Arc, /// Sync strategy sync: RwLock, }