From 6f5f1f5e26831575bee4d395989fb528f48a3272 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 8 Dec 2016 23:21:47 +0100 Subject: [PATCH] light: integrate with sync + serve_light CLI --- Cargo.lock | 1 + Cargo.toml | 1 + ethcore/light/build.rs | 1 + ethcore/light/src/lib.rs | 13 +++ ethcore/light/src/net/buffer_flow.rs | 13 +++ ethcore/light/src/net/mod.rs | 46 ++++++++--- ethcore/light/src/net/status.rs | 33 ++------ ethcore/light/src/net/tests/mod.rs | 10 +-- ethcore/light/src/provider.rs | 1 + parity/cli/config.full.toml | 1 + parity/cli/mod.rs | 5 ++ parity/cli/usage.txt | 1 + parity/configuration.rs | 2 + parity/main.rs | 1 + parity/modules.rs | 30 +++++-- parity/run.rs | 15 +++- parity/sync.rs | 10 ++- sync/build.rs | 6 +- sync/src/api.rs | 118 ++++++++++++++++++++++----- sync/src/lib.rs | 2 +- 20 files changed, 236 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b00701f32..9184d50f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -18,6 +18,7 @@ dependencies = [ "ethcore-ipc-hypervisor 1.2.0", "ethcore-ipc-nano 1.4.0", "ethcore-ipc-tests 0.1.0", + "ethcore-light 1.5.0", "ethcore-logger 1.5.0", "ethcore-rpc 1.5.0", "ethcore-signer 1.5.0", diff --git a/Cargo.toml b/Cargo.toml index 078d2916c..7e989b173 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ rlp = { path = "util/rlp" } ethcore-stratum = { path = "stratum" } ethcore-dapps = { path = "dapps", optional = true } clippy = { version = "0.0.103", optional = true} +ethcore-light = { path = "ethcore/light" } [target.'cfg(windows)'.dependencies] winapi = "0.2" diff --git a/ethcore/light/build.rs b/ethcore/light/build.rs index 43915c1cf..7d4e0064c 100644 --- a/ethcore/light/build.rs +++ b/ethcore/light/build.rs @@ -20,6 +20,7 @@ extern crate ethcore_ipc_codegen; #[cfg(feature = "ipc")] fn main() { ethcore_ipc_codegen::derive_binary("src/types/mod.rs.in").unwrap(); + ethcore_ipc_codegen::derive_ipc_cond("src/provider.rs", true).unwrap(); } #[cfg(not(feature = "ipc"))] diff --git a/ethcore/light/src/lib.rs b/ethcore/light/src/lib.rs index c00467c4c..7fa2f5911 100644 --- a/ethcore/light/src/lib.rs +++ b/ethcore/light/src/lib.rs @@ -33,8 +33,21 @@ pub mod client; pub mod net; + +#[cfg(not(feature = "ipc"))] pub mod provider; +#[cfg(feature = "ipc")] +pub mod provider { + #![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues + include!(concat!(env!("OUT_DIR"), "/provider.rs")); +} + +#[cfg(feature = "ipc")] +pub mod remote { + pub use provider::LightProviderClient; +} + mod types; pub use self::provider::Provider; diff --git a/ethcore/light/src/net/buffer_flow.rs b/ethcore/light/src/net/buffer_flow.rs index 6730c71a7..2371c6ea4 100644 --- a/ethcore/light/src/net/buffer_flow.rs +++ b/ethcore/light/src/net/buffer_flow.rs @@ -22,6 +22,9 @@ //! //! This module provides an interface for configuration of buffer //! flow costs and recharge rates. +//! +//! Current default costs are picked completely arbitrarily, not based +//! on any empirical timings or mathematical models. use request; use super::packet; @@ -273,6 +276,16 @@ impl FlowParams { } } +impl Default for FlowParams { + fn default() -> Self { + FlowParams { + limit: 50_000_000.into(), + costs: CostTable::default(), + recharge: 100_000.into(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 57dc77a86..fd64f4a4b 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -30,13 +30,14 @@ use util::{Bytes, Mutex, RwLock, U256}; use time::SteadyTime; use std::collections::HashMap; +use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use provider::Provider; use request::{self, Request}; use self::buffer_flow::{Buffer, FlowParams}; -use self::context::{IoContext, EventContext, Ctx}; +use self::context::Ctx; use self::error::{Error, Punishment}; mod buffer_flow; @@ -47,16 +48,20 @@ mod status; #[cfg(test)] mod tests; -pub use self::status::{Status, Capabilities, Announcement, NetworkId}; +pub use self::context::{EventContext, IoContext}; +pub use self::status::{Status, Capabilities, Announcement}; const TIMEOUT: TimerToken = 0; const TIMEOUT_INTERVAL_MS: u64 = 1000; -// LPV1 -const PROTOCOL_VERSION: u32 = 1; +// Supported protocol versions. +pub const PROTOCOL_VERSIONS: &'static [u8] = &[1]; -// TODO [rob] make configurable. -const PROTOCOL_ID: [u8; 3] = *b"les"; +// Max protocol version. +pub const MAX_PROTOCOL_VERSION: u8 = 1; + +// Packet count for LES. +pub const PACKET_COUNT: u8 = 15; // packet ID definitions. mod packet { @@ -173,6 +178,8 @@ pub trait Handler: Send + Sync { /// Called when a peer responds with header proofs. Each proof is a block header coupled /// with a series of trie nodes is ascending order by distance from the root. fn on_header_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[(Bytes, Vec)]) { } + /// Called on abort. + fn on_abort(&self) { } } // a request, the peer who it was made to, and the time it was made. @@ -185,7 +192,7 @@ struct Requested { /// Protocol parameters. pub struct Params { /// Network id. - pub network_id: NetworkId, + pub network_id: u64, /// Buffer flow parameters. pub flow_params: FlowParams, /// Initial capabilities. @@ -203,9 +210,9 @@ pub struct Params { // Locks must be acquired in the order declared, and when holding a read lock // on the peers, only one peer may be held at a time. pub struct LightProtocol { - provider: Box, + provider: Arc, genesis_hash: H256, - network_id: NetworkId, + network_id: u64, pending_peers: RwLock>, peers: RwLock>>, pending_requests: RwLock>, @@ -217,7 +224,7 @@ pub struct LightProtocol { impl LightProtocol { /// Create a new instance of the protocol manager. - pub fn new(provider: Box, params: Params) -> Self { + pub fn new(provider: Arc, params: Params) -> Self { let genesis_hash = provider.chain_info().genesis_hash; LightProtocol { provider: provider, @@ -323,6 +330,23 @@ impl LightProtocol { self.handlers.push(handler); } + /// Signal to handlers that network activity is being aborted + /// and clear peer data. + pub fn abort(&self) { + for handler in &self.handlers { + handler.on_abort(); + } + + // acquire in order and hold. + let mut pending_peers = self.pending_peers.write(); + let mut peers = self.peers.write(); + let mut pending_requests = self.pending_requests.write(); + + pending_peers.clear(); + peers.clear(); + pending_requests.clear(); + } + // Does the common pre-verification of responses before the response itself // is actually decoded: // - check whether peer exists @@ -460,7 +484,7 @@ impl LightProtocol { head_hash: chain_info.best_block_hash, head_num: chain_info.best_block_number, genesis_hash: chain_info.genesis_hash, - protocol_version: PROTOCOL_VERSION, + protocol_version: MAX_PROTOCOL_VERSION as u32, network_id: self.network_id, last_head: None, }; diff --git a/ethcore/light/src/net/status.rs b/ethcore/light/src/net/status.rs index 2c0c5f79a..6d0a76823 100644 --- a/ethcore/light/src/net/status.rs +++ b/ethcore/light/src/net/status.rs @@ -82,26 +82,6 @@ impl Key { } } -/// Network ID structure. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[repr(u32)] -pub enum NetworkId { - /// ID for the mainnet - Mainnet = 1, - /// ID for the testnet - Testnet = 0, -} - -impl NetworkId { - fn from_raw(raw: u32) -> Option { - match raw { - 0 => Some(NetworkId::Testnet), - 1 => Some(NetworkId::Mainnet), - _ => None, - } - } -} - // helper for decoding key-value pairs in the handshake or an announcement. struct Parser<'a> { pos: usize, @@ -164,7 +144,7 @@ pub struct Status { /// Protocol version. pub protocol_version: u32, /// Network id of this peer. - pub network_id: NetworkId, + pub network_id: u64, /// Total difficulty of the head of the chain. pub head_td: U256, /// Hash of the best block. @@ -225,8 +205,7 @@ pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowP let status = Status { protocol_version: try!(parser.expect(Key::ProtocolVersion)), - network_id: try!(parser.expect(Key::NetworkId) - .and_then(|id: u32| NetworkId::from_raw(id).ok_or(DecoderError::Custom("Invalid network ID")))), + network_id: try!(parser.expect(Key::NetworkId)), head_td: try!(parser.expect(Key::HeadTD)), head_hash: try!(parser.expect(Key::HeadHash)), head_num: try!(parser.expect(Key::HeadNum)), @@ -254,7 +233,7 @@ pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowP pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params: &FlowParams) -> Vec { let mut pairs = Vec::new(); pairs.push(encode_pair(Key::ProtocolVersion, &status.protocol_version)); - pairs.push(encode_pair(Key::NetworkId, &(status.network_id as u32))); + pairs.push(encode_pair(Key::NetworkId, &(status.network_id as u64))); pairs.push(encode_pair(Key::HeadTD, &status.head_td)); pairs.push(encode_pair(Key::HeadHash, &status.head_hash)); pairs.push(encode_pair(Key::HeadNum, &status.head_num)); @@ -385,7 +364,7 @@ mod tests { fn full_handshake() { let status = Status { protocol_version: 1, - network_id: NetworkId::Mainnet, + network_id: 1, head_td: U256::default(), head_hash: H256::default(), head_num: 10, @@ -420,7 +399,7 @@ mod tests { fn partial_handshake() { let status = Status { protocol_version: 1, - network_id: NetworkId::Mainnet, + network_id: 1, head_td: U256::default(), head_hash: H256::default(), head_num: 10, @@ -455,7 +434,7 @@ mod tests { fn skip_unknown_keys() { let status = Status { protocol_version: 1, - network_id: NetworkId::Mainnet, + network_id: 1, head_td: U256::default(), head_hash: H256::default(), head_num: 10, diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index e659d0681..30ab2bab2 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -25,7 +25,7 @@ use network::PeerId; use net::buffer_flow::FlowParams; use net::context::IoContext; -use net::status::{Capabilities, Status, NetworkId, write_handshake}; +use net::status::{Capabilities, Status, write_handshake}; use net::{encode_request, LightProtocol, Params, packet}; use provider::Provider; use request::{self, Request, Headers}; @@ -174,8 +174,8 @@ fn setup(flow_params: FlowParams, capabilities: Capabilities) -> (Arc (Arc Status { Status { - protocol_version: ::net::PROTOCOL_VERSION, - network_id: NetworkId::Testnet, + protocol_version: 1, + network_id: 2, head_td: chain_info.total_difficulty, head_hash: chain_info.best_block_hash, head_num: chain_info.best_block_number, diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs index 9446aa3f6..37a5cef4d 100644 --- a/ethcore/light/src/provider.rs +++ b/ethcore/light/src/provider.rs @@ -33,6 +33,7 @@ use request; /// or empty vector where appropriate. /// /// [1]: https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES) +#[cfg_attr(feature = "ipc", ipc(client_ident="LightProviderClient"))] pub trait Provider: Send + Sync { /// Provide current blockchain info. fn chain_info(&self) -> BlockChainInfo; diff --git a/parity/cli/config.full.toml b/parity/cli/config.full.toml index fcd9a9712..73b5e13be 100644 --- a/parity/cli/config.full.toml +++ b/parity/cli/config.full.toml @@ -32,6 +32,7 @@ warp = true allow_ips = "all" snapshot_peers = 0 max_pending_peers = 64 +serve_light = true reserved_only = false reserved_peers = "./path_to_file" diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 7fcdd2209..2335ccee8 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -135,6 +135,8 @@ usage! { flag_reserved_only: bool = false, or |c: &Config| otry!(c.network).reserved_only.clone(), flag_no_ancient_blocks: bool = false, or |_| None, + flag_serve_light: bool = false, + or |c: &Config| otry!(c.network).serve_light.clone(), // -- API and Console Options // RPC @@ -334,6 +336,7 @@ struct Network { node_key: Option, reserved_peers: Option, reserved_only: Option, + serve_light: Option, } #[derive(Default, Debug, PartialEq, RustcDecodable)] @@ -543,6 +546,7 @@ mod tests { flag_reserved_peers: Some("./path_to_file".into()), flag_reserved_only: false, flag_no_ancient_blocks: false, + flag_serve_light: true, // -- API and Console Options // RPC @@ -713,6 +717,7 @@ mod tests { node_key: None, reserved_peers: Some("./path/to/reserved_peers".into()), reserved_only: Some(true), + serve_light: None, }), rpc: Some(Rpc { disable: Some(true), diff --git a/parity/cli/usage.txt b/parity/cli/usage.txt index b67af6110..58b836aa4 100644 --- a/parity/cli/usage.txt +++ b/parity/cli/usage.txt @@ -97,6 +97,7 @@ Networking Options: --max-pending-peers NUM Allow up to NUM pending connections. (default: {flag_max_pending_peers}) --no-ancient-blocks Disable downloading old blocks after snapshot restoration or warp sync. (default: {flag_no_ancient_blocks}) + --serve-light Experimental: Serve light client peers. (default: {flag_serve_light}) API and Console Options: --no-jsonrpc Disable the JSON-RPC API server. (default: {flag_no_jsonrpc}) diff --git a/parity/configuration.rs b/parity/configuration.rs index 37c699521..94fa20e59 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -275,6 +275,7 @@ impl Configuration { no_periodic_snapshot: self.args.flag_no_periodic_snapshot, check_seal: !self.args.flag_no_seal_check, download_old_blocks: !self.args.flag_no_ancient_blocks, + serve_light: self.args.flag_serve_light, }; Cmd::Run(run_cmd) }; @@ -921,6 +922,7 @@ mod tests { no_periodic_snapshot: false, check_seal: true, download_old_blocks: true, + serve_light: false, })); } diff --git a/parity/main.rs b/parity/main.rs index c125e87f6..17f5ed74b 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -43,6 +43,7 @@ extern crate serde; extern crate serde_json; extern crate rlp; extern crate ethcore_hash_fetch as hash_fetch; +extern crate ethcore_light as light; extern crate ethcore_ipc_hypervisor as hypervisor; extern crate ethcore_rpc; diff --git a/parity/modules.rs b/parity/modules.rs index 39e05a293..5d1d66cd7 100644 --- a/parity/modules.rs +++ b/parity/modules.rs @@ -15,16 +15,22 @@ // along with Parity. If not, see . use std::sync::Arc; +use std::path::Path; + use ethcore::client::BlockChainClient; use hypervisor::Hypervisor; -use ethsync::{SyncConfig, NetworkConfiguration, NetworkError}; +use ethsync::{SyncConfig, NetworkConfiguration, NetworkError, Params}; use ethcore::snapshot::SnapshotService; +use light::Provider; + #[cfg(not(feature="ipc"))] use self::no_ipc_deps::*; + +#[cfg(not(feature="ipc"))] +use ethcore_logger::Config as LogConfig; + #[cfg(feature="ipc")] use self::ipc_deps::*; -use ethcore_logger::Config as LogConfig; -use std::path::Path; #[cfg(feature="ipc")] pub mod service_urls { @@ -36,6 +42,8 @@ pub mod service_urls { pub const SYNC_NOTIFY: &'static str = "parity-sync-notify.ipc"; pub const NETWORK_MANAGER: &'static str = "parity-manage-net.ipc"; pub const SYNC_CONTROL: &'static str = "parity-sync-control.ipc"; + pub const LIGHT_PROVIDER: &'static str = "parity-light-provider.ipc"; + #[cfg(feature="stratum")] pub const STRATUM: &'static str = "parity-stratum.ipc"; #[cfg(feature="stratum")] @@ -75,6 +83,7 @@ mod ipc_deps { pub use nanoipc::{GuardedSocket, NanoSocket, generic_client, fast_client}; pub use ipc::IpcSocket; pub use ipc::binary::serialize; + pub use light::remote::LightProviderClient; } #[cfg(feature="ipc")] @@ -124,6 +133,7 @@ pub fn sync net_cfg: NetworkConfiguration, _client: Arc, _snapshot_service: Arc, + _provider: Arc, log_settings: &LogConfig, ) -> Result @@ -141,7 +151,9 @@ pub fn sync &service_urls::with_base(&hypervisor.io_path, service_urls::SYNC_NOTIFY)).unwrap(); let manage_client = generic_client::>( &service_urls::with_base(&hypervisor.io_path, service_urls::NETWORK_MANAGER)).unwrap(); - + let provider_client = generic_client::>( + &service_urls::with_base(&hypervisor.io_path, service_urls::LIGHT_PROVIDER)).unwrap(); + *hypervisor_ref = Some(hypervisor); Ok((sync_client, manage_client, notify_client)) } @@ -154,10 +166,18 @@ pub fn sync net_cfg: NetworkConfiguration, client: Arc, snapshot_service: Arc, + provider: Arc, _log_settings: &LogConfig, ) -> Result { - let eth_sync = try!(EthSync::new(sync_cfg, client, snapshot_service, net_cfg)); + let eth_sync = try!(EthSync::new(Params { + config: sync_cfg, + chain: client, + provider: provider, + snapshot_service: snapshot_service, + network_config: net_cfg, + })); + Ok((eth_sync.clone() as Arc, eth_sync.clone() as Arc, eth_sync.clone() as Arc)) } diff --git a/parity/run.rs b/parity/run.rs index 42a972000..5c02e4021 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -92,6 +92,7 @@ pub struct RunCmd { pub no_periodic_snapshot: bool, pub check_seal: bool, pub download_old_blocks: bool, + pub serve_light: bool, } pub fn open_ui(dapps_conf: &dapps::Configuration, signer_conf: &signer::Configuration) -> Result<(), String> { @@ -185,6 +186,11 @@ pub fn execute(cmd: RunCmd, logger: Arc) -> Result<(), String> { ); info!("Operating mode: {}", Colour::White.bold().paint(format!("{}", mode))); + if cmd.serve_light { + info!("Configured to serve light client peers. Please note this feature is {}.", + Colour::White.bold().paint("experimental".to_string())); + } + // display warning about using experimental journaldb alorithm if !algorithm.is_stable() { warn!("Your chosen strategy is {}! You can re-run with --pruning to change.", Colour::Red.bold().paint("unstable")); @@ -204,6 +210,7 @@ pub fn execute(cmd: RunCmd, logger: Arc) -> Result<(), String> { sync_config.fork_block = spec.fork_block(); sync_config.warp_sync = cmd.warp_sync; sync_config.download_old_blocks = cmd.download_old_blocks; + sync_config.serve_light = cmd.serve_light; // prepare account provider let account_provider = Arc::new(try!(prepare_account_provider(&cmd.dirs, cmd.acc_conf))); @@ -268,7 +275,13 @@ pub fn execute(cmd: RunCmd, logger: Arc) -> Result<(), String> { // create sync object let (sync_provider, manage_network, chain_notify) = try!(modules::sync( - &mut hypervisor, sync_config, net_conf.into(), client.clone(), snapshot_service.clone(), &cmd.logger_config, + &mut hypervisor, + sync_config, + net_conf.into(), + client.clone(), + snapshot_service.clone(), + client.clone(), + &cmd.logger_config, ).map_err(|e| format!("Sync error: {}", e))); service.add_notify(chain_notify.clone()); diff --git a/parity/sync.rs b/parity/sync.rs index 25f900b78..17f183c80 100644 --- a/parity/sync.rs +++ b/parity/sync.rs @@ -22,6 +22,7 @@ use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL, ControlService}; use ethcore::client::ChainNotify; use ethcore::client::remote::RemoteClient; use ethcore::snapshot::remote::RemoteSnapshotService; +use light::remote::LightProviderClient; use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration}; use modules::service_urls; use boot; @@ -48,8 +49,15 @@ pub fn main() { let remote_client = dependency!(RemoteClient, &service_urls::with_base(&service_config.io_path, service_urls::CLIENT)); let remote_snapshot = dependency!(RemoteSnapshotService, &service_urls::with_base(&service_config.io_path, service_urls::SNAPSHOT)); + let remote_provider = dependency!(LightProviderClient, &service_urls::with_base(&service_config.io_path, service_urls::LIGHT_PROVIDER)); - let sync = EthSync::new(service_config.sync, remote_client.service().clone(), remote_snapshot.service().clone(), service_config.net).unwrap(); + let sync = EthSync::new(Params { + config: service_config.sync, + chain: remote_client.service().clone(), + snapshot_service: remote_snapshot.service().clone(), + provider: remote_provider.service().clone(), + network_config: service_config.net + }).unwrap(); let _ = boot::main_thread(); let service_stop = Arc::new(AtomicBool::new(false)); diff --git a/sync/build.rs b/sync/build.rs index c465d5e34..1e08ae652 100644 --- a/sync/build.rs +++ b/sync/build.rs @@ -16,6 +16,10 @@ extern crate ethcore_ipc_codegen; +#[cfg(feature = "ipc")] fn main() { - ethcore_ipc_codegen::derive_ipc_cond("src/api.rs", cfg!(feature="ipc")).unwrap(); + ethcore_ipc_codegen::derive_ipc_cond("src/api.rs", true).unwrap(); } + +#[cfg(not(feature = "ipc"))] +fn main() {} diff --git a/sync/src/api.rs b/sync/src/api.rs index d8df149c8..acc593fb1 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -33,6 +33,7 @@ use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig}; use std::str::FromStr; use parking_lot::RwLock; use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT}; +use light::net::{LightProtocol, Params as LightParams, Capabilities, Handler as LightHandler, EventContext}; pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"par"; @@ -47,10 +48,14 @@ pub struct SyncConfig { pub network_id: u64, /// Main "eth" subprotocol name. pub subprotocol_name: [u8; 3], + /// Light "les" subprotocol name. + pub light_subprotocol_name: [u8; 3], /// Fork block to check pub fork_block: Option<(BlockNumber, H256)>, /// Enable snapshot sync pub warp_sync: bool, + /// Enable light client server. + pub serve_light: bool, } impl Default for SyncConfig { @@ -60,8 +65,10 @@ impl Default for SyncConfig { download_old_blocks: true, network_id: 1, subprotocol_name: *b"eth", + light_subprotocol_name: *b"les", fork_block: None, warp_sync: false, + serve_light: false, } } } @@ -116,30 +123,74 @@ pub struct PeerInfo { pub eth_difficulty: Option, } +/// EthSync initialization parameters. +#[cfg_attr(feature = "ipc", derive(Binary))] +pub struct Params { + /// Configuration. + pub config: SyncConfig, + /// Blockchain client. + pub chain: Arc, + /// Snapshot service. + pub snapshot_service: Arc, + /// Light data provider. + pub provider: Arc<::light::Provider>, + /// Network layer configuration. + pub network_config: NetworkConfiguration, +} + /// Ethereum network protocol handler pub struct EthSync { /// Network service network: NetworkService, - /// Protocol handler - handler: Arc, + /// Main (eth/par) protocol handler + sync_handler: Arc, + /// Light (les) protocol handler + light_proto: Option>, /// The main subprotocol name subprotocol_name: [u8; 3], + /// Light subprotocol name. + light_subprotocol_name: [u8; 3], } impl EthSync { /// Creates and register protocol with the network service - pub fn new(config: SyncConfig, chain: Arc, snapshot_service: Arc, network_config: NetworkConfiguration) -> Result, NetworkError> { - let chain_sync = ChainSync::new(config, &*chain); - let service = try!(NetworkService::new(try!(network_config.clone().into_basic()))); - let sync = Arc::new(EthSync{ + pub fn new(params: Params) -> Result, NetworkError> { + let pruning_info = params.chain.pruning_info(); + let light_proto = match params.config.serve_light { + false => None, + true => Some({ + let light_params = LightParams { + network_id: params.config.network_id, + flow_params: Default::default(), + capabilities: Capabilities { + serve_headers: true, + serve_chain_since: Some(pruning_info.earliest_chain), + serve_state_since: Some(pruning_info.earliest_state), + tx_relay: true, + }, + }; + + let mut light_proto = LightProtocol::new(params.provider, light_params); + light_proto.add_handler(Box::new(TxRelay(params.chain.clone()))); + + Arc::new(light_proto) + }) + }; + + let chain_sync = ChainSync::new(params.config, &*params.chain); + let service = try!(NetworkService::new(try!(params.network_config.clone().into_basic()))); + + let sync = Arc::new(EthSync { network: service, - handler: Arc::new(SyncProtocolHandler { + sync_handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), - chain: chain, - snapshot_service: snapshot_service, + chain: params.chain, + snapshot_service: params.snapshot_service, overlay: RwLock::new(HashMap::new()), }), - subprotocol_name: config.subprotocol_name, + light_proto: light_proto, + subprotocol_name: params.config.subprotocol_name, + light_subprotocol_name: params.config.light_subprotocol_name, }); Ok(sync) @@ -150,14 +201,15 @@ impl EthSync { impl SyncProvider for EthSync { /// Get sync status fn status(&self) -> SyncStatus { - self.handler.sync.write().status() + self.sync_handler.sync.write().status() } /// Get sync peers fn peers(&self) -> Vec { + // TODO: [rob] LES peers/peer info self.network.with_context_eval(self.subprotocol_name, |context| { - let sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay); - self.handler.sync.write().peers(&sync_io) + let sync_io = NetSyncIo::new(context, &*self.sync_handler.chain, &*self.sync_handler.snapshot_service, &self.sync_handler.overlay); + self.sync_handler.sync.write().peers(&sync_io) }).unwrap_or(Vec::new()) } @@ -166,7 +218,7 @@ impl SyncProvider for EthSync { } fn transactions_stats(&self) -> BTreeMap { - let sync = self.handler.sync.read(); + let sync = self.sync_handler.sync.read(); sync.transactions_stats() .iter() .map(|(hash, stats)| (*hash, stats.into())) @@ -228,8 +280,8 @@ impl ChainNotify for EthSync { _duration: u64) { self.network.with_context(self.subprotocol_name, |context| { - let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay); - self.handler.sync.write().chain_new_blocks( + let mut sync_io = NetSyncIo::new(context, &*self.sync_handler.chain, &*self.sync_handler.snapshot_service, &self.sync_handler.overlay); + self.sync_handler.sync.write().chain_new_blocks( &mut sync_io, &imported, &invalid, @@ -245,19 +297,36 @@ impl ChainNotify for EthSync { Err(err) => warn!("Error starting network: {}", err), _ => {}, } - self.network.register_protocol(self.handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[62u8, 63u8]) + self.network.register_protocol(self.sync_handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[62u8, 63u8]) .unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); // register the warp sync subprotocol - self.network.register_protocol(self.handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8]) + self.network.register_protocol(self.sync_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8]) .unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e)); + + // register the light protocol. + if let Some(light_proto) = self.light_proto.as_ref().map(|x| x.clone()) { + self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS) + .unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e)); + } } fn stop(&self) { - self.handler.snapshot_service.abort_restore(); + self.sync_handler.snapshot_service.abort_restore(); self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); } } +/// LES event handler. +/// Simply queues transactions from light client peers. +struct TxRelay(Arc); + +impl LightHandler for TxRelay { + fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) { + trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer()); + self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect()) + } +} + impl IpcConfig for ManageNetwork { } impl IpcConfig for SyncProvider { } @@ -304,9 +373,14 @@ impl ManageNetwork for EthSync { fn stop_network(&self) { self.network.with_context(self.subprotocol_name, |context| { - let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay); - self.handler.sync.write().abort(&mut sync_io); + let mut sync_io = NetSyncIo::new(context, &*self.sync_handler.chain, &*self.sync_handler.snapshot_service, &self.sync_handler.overlay); + self.sync_handler.sync.write().abort(&mut sync_io); }); + + if let Some(light_proto) = self.light_proto.as_ref() { + light_proto.abort(); + } + self.stop(); } @@ -452,4 +526,4 @@ pub struct ServiceConfiguration { pub net: NetworkConfiguration, /// IPC path. pub io_path: String, -} +} \ No newline at end of file diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 801fcbbd5..09f79f16f 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -67,7 +67,7 @@ mod api { #[cfg(not(feature = "ipc"))] mod api; -pub use api::{EthSync, SyncProvider, ManageNetwork, SyncConfig, +pub use api::{EthSync, Params, SyncProvider, ManageNetwork, SyncConfig, ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats}; pub use chain::{SyncStatus, SyncState}; pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError};