diff --git a/Cargo.lock b/Cargo.lock index 5201afad5..ea6f0438d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -466,10 +466,16 @@ dependencies = [ "clippy 0.0.79 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore 1.3.0", + "ethcore-ipc 1.3.0", + "ethcore-ipc-codegen 1.3.0", + "ethcore-ipc-nano 1.3.0", "ethcore-util 1.3.0", "heapsize 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", + "semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "syntex 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index c92d82636..2b1286ef4 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1014,4 +1014,4 @@ impl MayPanic for Client { } } -impl IpcConfig for Client { } +impl IpcConfig for Arc { } diff --git a/ethcore/src/tests/rpc.rs b/ethcore/src/tests/rpc.rs index 786389905..a4d3621bc 100644 --- a/ethcore/src/tests/rpc.rs +++ b/ethcore/src/tests/rpc.rs @@ -36,7 +36,7 @@ pub fn run_test_worker(scope: &crossbeam::Scope, stop: Arc, socket_p temp.as_path(), Arc::new(Miner::with_spec(get_test_spec())), IoChannel::disconnected()).unwrap(); - let mut worker = nanoipc::Worker::new(&client); + let mut worker = nanoipc::Worker::new(&(client as Arc)); worker.add_reqrep(&socket_path).unwrap(); while !stop.load(Ordering::Relaxed) { worker.poll(); diff --git a/ipc/codegen/src/codegen.rs b/ipc/codegen/src/codegen.rs index 89aae7eae..f56e00430 100644 --- a/ipc/codegen/src/codegen.rs +++ b/ipc/codegen/src/codegen.rs @@ -574,8 +574,6 @@ fn push_client_implementation( interface_map: &InterfaceMap, push: &mut FnMut(Annotatable), ) { - let item_ident = interface_map.ident_map.qualified_ident(builder); - let mut index = -1i32; let items = interface_map.dispatches.iter() .map(|_| { index = index + 1; P(implement_client_method(cx, builder, index as u16, interface_map)) }) @@ -584,12 +582,13 @@ fn push_client_implementation( let generics = client_generics(builder, interface_map); let client_ident = client_qualified_ident(cx, builder, interface_map); let where_clause = &generics.where_clause; + let endpoint = interface_map.endpoint; let handshake_item = quote_impl_item!(cx, pub fn handshake(&self) -> Result<(), ::ipc::Error> { let payload = ::ipc::Handshake { - protocol_version: $item_ident::protocol_version(), - api_version: $item_ident::api_version(), + protocol_version: Arc::<$endpoint>::protocol_version(), + api_version: Arc::<$endpoint>::api_version(), }; ::ipc::invoke( @@ -734,6 +733,7 @@ struct InterfaceMap { pub generics: Generics, pub impl_trait: Option, pub ident_map: IdentMap, + pub endpoint: Ident, } struct IdentMap { @@ -753,10 +753,6 @@ impl IdentMap { builder.id(format!("{}Client", self.original_path.segments[0].identifier)) } } - - fn qualified_ident(&self, builder: &aster::AstBuilder) -> Ident { - builder.id(format!("{}", ::syntax::print::pprust::path_to_string(&self.original_path).replace("<", "::<"))) - } } fn ty_ident_map(original_ty: &P) -> IdentMap { @@ -804,8 +800,13 @@ fn implement_interface( let (handshake_arm, handshake_arm_buf) = implement_handshake_arm(cx); let ty = ty_ident_map(&original_ty).ident(builder); + let interface_endpoint = match *impl_trait { + Some(ref trait_) => builder.id(::syntax::print::pprust::path_to_string(&trait_.path)), + None => ty + }; + let ipc_item = quote_item!(cx, - impl $impl_generics ::ipc::IpcInterface<$ty> for $ty $where_clause { + impl $impl_generics ::ipc::IpcInterface<$interface_endpoint> for Arc<$interface_endpoint> $where_clause { fn dispatch(&self, r: &mut R) -> Vec where R: ::std::io::Read { @@ -844,5 +845,6 @@ fn implement_interface( dispatches: dispatch_table, generics: generics.clone(), impl_trait: impl_trait.clone(), + endpoint: interface_endpoint, }) } diff --git a/ipc/hypervisor/src/service.rs.in b/ipc/hypervisor/src/service.rs.in index 2b626d769..1e569f5bd 100644 --- a/ipc/hypervisor/src/service.rs.in +++ b/ipc/hypervisor/src/service.rs.in @@ -68,4 +68,4 @@ impl HypervisorService { } } -impl ::ipc::IpcConfig for HypervisorService {} +impl ::ipc::IpcConfig for Arc {} diff --git a/ipc/nano/src/lib.rs b/ipc/nano/src/lib.rs index 38ff05c5b..9262b3fe8 100644 --- a/ipc/nano/src/lib.rs +++ b/ipc/nano/src/lib.rs @@ -33,7 +33,7 @@ const POLL_TIMEOUT: isize = 100; const CLIENT_CONNECTION_TIMEOUT: isize = 2500; /// Generic worker to handle service (binded) sockets -pub struct Worker where S: IpcInterface { +pub struct Worker where Arc: IpcInterface { service: Arc, sockets: Vec<(Socket, Endpoint)>, polls: Vec, @@ -116,7 +116,7 @@ pub enum SocketError { RequestLink, } -impl Worker where S: IpcInterface { +impl Worker where Arc: IpcInterface { /// New worker over specified `service` pub fn new(service: &Arc) -> Worker { Worker:: { diff --git a/ipc/rpc/src/binary.rs b/ipc/rpc/src/binary.rs index 94d930941..299e5ee73 100644 --- a/ipc/rpc/src/binary.rs +++ b/ipc/rpc/src/binary.rs @@ -610,6 +610,7 @@ impl From<::semver::Version> for BinVersion { } } +binary_fixed_size!(u16); binary_fixed_size!(u64); binary_fixed_size!(u32); binary_fixed_size!(usize); diff --git a/ipc/rpc/src/interface.rs b/ipc/rpc/src/interface.rs index 3d97d59bc..a8d0eba2f 100644 --- a/ipc/rpc/src/interface.rs +++ b/ipc/rpc/src/interface.rs @@ -29,7 +29,7 @@ pub struct Handshake { /// Allows to configure custom version and custom handshake response for /// ipc host -pub trait IpcConfig { +pub trait IpcConfig { /// Current service api version /// Should be increased if any of the methods changes signature fn api_version() -> Version { @@ -60,7 +60,7 @@ pub enum Error { /// Allows implementor to be attached to generic worker and dispatch rpc requests /// over IPC -pub trait IpcInterface: IpcConfig { +pub trait IpcInterface : IpcConfig { /// reads the message from io, dispatches the call and returns serialized result fn dispatch(&self, r: &mut R) -> Vec where R: Read; diff --git a/parity/informant.rs b/parity/informant.rs index 1a78db79b..32d730295 100644 --- a/parity/informant.rs +++ b/parity/informant.rs @@ -20,8 +20,8 @@ use self::ansi_term::Style; use std::time::{Instant, Duration}; use std::ops::{Deref, DerefMut}; -use ethsync::SyncStatus; -use util::{Uint, RwLock, NetworkConfiguration}; +use ethsync::{SyncStatus, NetworkConfiguration}; +use util::{Uint, RwLock}; use ethcore::client::*; use number_prefix::{binary_prefix, Standalone, Prefixed}; diff --git a/parity/main.rs b/parity/main.rs index c1bd6f1ce..2417f062a 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -89,7 +89,7 @@ use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path, Bloc use ethcore::error::{ImportError}; use ethcore::service::ClientService; use ethcore::spec::Spec; -use ethsync::EthSync; +use ethsync::{EthSync, NetworkConfiguration}; use ethcore::miner::{Miner, MinerService, ExternalMiner}; use migration::migrate; use informant::Informant; @@ -248,7 +248,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig) let network_settings = Arc::new(conf.network_settings()); // Sync - let sync = EthSync::new(sync_config, client.clone(), net_settings) + let sync = EthSync::new(sync_config, client.clone(), NetworkConfiguration::from(net_settings)) .unwrap_or_else(|e| die_with_error("Sync", ethcore::error::Error::Util(e))); service.set_notify(&(sync.clone() as Arc)); diff --git a/rpc/src/v1/impls/ethcore_set.rs b/rpc/src/v1/impls/ethcore_set.rs index ae4756eac..2c02c51ac 100644 --- a/rpc/src/v1/impls/ethcore_set.rs +++ b/rpc/src/v1/impls/ethcore_set.rs @@ -20,7 +20,6 @@ use jsonrpc_core::*; use ethcore::miner::MinerService; use ethcore::client::MiningBlockChainClient; use ethsync::ManageNetwork; -use util::network::NonReservedPeerMode; use v1::traits::EthcoreSet; use v1::types::{Bytes, H160, U256}; @@ -116,7 +115,7 @@ impl EthcoreSet for EthcoreSetClient where fn add_reserved_peer(&self, params: Params) -> Result { try!(self.active()); from_params::<(String,)>(params).and_then(|(peer,)| { - match take_weak!(self.net).add_reserved_peer(&peer) { + match take_weak!(self.net).add_reserved_peer(peer) { Ok(()) => to_value(&true), Err(_) => Err(Error::invalid_params()), } @@ -126,7 +125,7 @@ impl EthcoreSet for EthcoreSetClient where fn remove_reserved_peer(&self, params: Params) -> Result { try!(self.active()); from_params::<(String,)>(params).and_then(|(peer,)| { - match take_weak!(self.net).remove_reserved_peer(&peer) { + match take_weak!(self.net).remove_reserved_peer(peer) { Ok(()) => to_value(&true), Err(_) => Err(Error::invalid_params()), } @@ -135,13 +134,13 @@ impl EthcoreSet for EthcoreSetClient where fn drop_non_reserved_peers(&self, _: Params) -> Result { try!(self.active()); - take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Deny); + take_weak!(self.net).deny_unreserved_peers(); to_value(&true) } fn accept_non_reserved_peers(&self, _: Params) -> Result { try!(self.active()); - take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Accept); + take_weak!(self.net).accept_unreserved_peers(); to_value(&true) } diff --git a/rpc/src/v1/tests/mocked/manage_network.rs b/rpc/src/v1/tests/mocked/manage_network.rs index 5ba243484..10e947df7 100644 --- a/rpc/src/v1/tests/mocked/manage_network.rs +++ b/rpc/src/v1/tests/mocked/manage_network.rs @@ -14,17 +14,18 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -use ethsync::ManageNetwork; -use util::network::NetworkConfiguration; +use ethsync::{ManageNetwork, NetworkConfiguration}; +use util; pub struct TestManageNetwork; // TODO: rob, gavin (originally introduced this functions) - proper tests and test state impl ManageNetwork for TestManageNetwork { - fn set_non_reserved_mode(&self, _mode: ::util::network::NonReservedPeerMode) {} - fn remove_reserved_peer(&self, _peer: &str) -> Result<(), String> { Ok(()) } - fn add_reserved_peer(&self, _peer: &str) -> Result<(), String> { Ok(()) } + fn accept_unreserved_peers(&self) { } + fn deny_unreserved_peers(&self) { } + fn remove_reserved_peer(&self, _peer: String) -> Result<(), String> { Ok(()) } + fn add_reserved_peer(&self, _peer: String) -> Result<(), String> { Ok(()) } fn start_network(&self) {} fn stop_network(&self) {} - fn network_config(&self) -> NetworkConfiguration { NetworkConfiguration::new_local() } + fn network_config(&self) -> NetworkConfiguration { NetworkConfiguration::from(util::NetworkConfiguration::new_local()) } } diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 939381a56..3077ee78b 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -4,9 +4,14 @@ name = "ethsync" version = "1.3.0" license = "GPL-3.0" authors = ["Ethcore . + +extern crate syntex; +extern crate ethcore_ipc_codegen as codegen; + +use std::env; +use std::path::Path; + +fn main() { + let out_dir = env::var_os("OUT_DIR").unwrap(); + + // sync interface + { + let src = Path::new("src/api.rs"); + let intermediate = Path::new(&out_dir).join("api.intermediate.rs"); + let mut registry = syntex::Registry::new(); + codegen::register(&mut registry); + registry.expand("", &src, &intermediate).unwrap(); + + let dst = Path::new(&out_dir).join("api.ipc.rs"); + let mut registry = syntex::Registry::new(); + codegen::register(&mut registry); + registry.expand("", &intermediate, &dst).unwrap(); + } +} diff --git a/sync/src/api.rs b/sync/src/api.rs new file mode 100644 index 000000000..db2ce031a --- /dev/null +++ b/sync/src/api.rs @@ -0,0 +1,273 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use std::ops::*; +use std::sync::Arc; +use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, + NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode}; +use util::{TimerToken, U256, H256, UtilError, Secret, Populatable}; +use ethcore::client::{Client, ChainNotify}; +use io::NetSyncIo; +use chain::{ChainSync, SyncStatus}; +use std::net::{SocketAddr, AddrParseError}; +use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig}; +use std::mem; +use std::collections::VecDeque; +use parking_lot::RwLock; + +/// Ethereum sync protocol +pub const ETH_PROTOCOL: &'static str = "eth"; + +/// Sync configuration +pub struct SyncConfig { + /// Max blocks to download ahead + pub max_download_ahead_blocks: usize, + /// Network ID + pub network_id: U256, +} + +impl Default for SyncConfig { + fn default() -> SyncConfig { + SyncConfig { + max_download_ahead_blocks: 20000, + network_id: U256::from(1), + } + } +} + +binary_fixed_size!(SyncConfig); +binary_fixed_size!(SyncStatus); + +/// Current sync status +pub trait SyncProvider: Send + Sync { + /// Get sync status + fn status(&self) -> SyncStatus; +} + +/// Ethereum network protocol handler +pub struct EthSync { + /// Network service + network: NetworkService, + /// Protocol handler + handler: Arc, +} + +impl EthSync { + /// Creates and register protocol with the network service + pub fn new(config: SyncConfig, chain: Arc, network_config: NetworkConfiguration) -> Result, UtilError> { + let chain_sync = ChainSync::new(config, chain.deref()); + let service = try!(NetworkService::new(try!(network_config.into_basic()))); + let sync = Arc::new(EthSync{ + network: service, + handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain }), + }); + + Ok(sync) + } +} + +#[derive(Ipc)] +#[ipc(client_ident="SyncClient")] +impl SyncProvider for EthSync { + /// Get sync status + fn status(&self) -> SyncStatus { + self.handler.sync.write().status() + } +} + +struct SyncProtocolHandler { + /// Shared blockchain client. TODO: this should evetually become an IPC endpoint + chain: Arc, + /// Sync strategy + sync: RwLock, +} + +impl NetworkProtocolHandler for SyncProtocolHandler { + fn initialize(&self, io: &NetworkContext) { + io.register_timer(0, 1000).expect("Error registering sync timer"); + } + + fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { + ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, self.chain.deref()), *peer, packet_id, data); + } + + fn connected(&self, io: &NetworkContext, peer: &PeerId) { + self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer); + } + + fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { + self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); + } + + fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { + self.sync.write().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref())); + self.sync.write().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref())); + } +} + +impl ChainNotify for EthSync { + fn new_blocks(&self, + imported: Vec, + invalid: Vec, + enacted: Vec, + retracted: Vec, + sealed: Vec) + { + self.network.with_context(ETH_PROTOCOL, |context| { + let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref()); + self.handler.sync.write().chain_new_blocks( + &mut sync_io, + &imported, + &invalid, + &enacted, + &retracted, + &sealed); + }); + } + + fn start(&self) { + self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e)); + self.network.register_protocol(self.handler.clone(), ETH_PROTOCOL, &[62u8, 63u8]) + .unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); + } + + fn stop(&self) { + self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); + } +} + +impl IpcConfig for Arc { } +impl IpcConfig for Arc { } + +/// Trait for managing network +pub trait ManageNetwork : Send + Sync { + /// Set to allow unreserved peers to connect + fn accept_unreserved_peers(&self); + /// Set to deny unreserved peers to connect + fn deny_unreserved_peers(&self); + /// Remove reservation for the peer + fn remove_reserved_peer(&self, peer: String) -> Result<(), String>; + /// Add reserved peer + fn add_reserved_peer(&self, peer: String) -> Result<(), String>; + /// Start network + fn start_network(&self); + /// Stop network + fn stop_network(&self); + /// Query the current configuration of the network + fn network_config(&self) -> NetworkConfiguration; +} + + +#[derive(Ipc)] +#[ipc(client_ident="NetworkManagerClient")] +impl ManageNetwork for EthSync { + fn accept_unreserved_peers(&self) { + self.network.set_non_reserved_mode(NonReservedPeerMode::Accept); + } + + fn deny_unreserved_peers(&self) { + self.network.set_non_reserved_mode(NonReservedPeerMode::Deny); + } + + fn remove_reserved_peer(&self, peer: String) -> Result<(), String> { + self.network.remove_reserved_peer(&peer).map_err(|e| format!("{:?}", e)) + } + + fn add_reserved_peer(&self, peer: String) -> Result<(), String> { + self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e)) + } + + fn start_network(&self) { + self.start(); + } + + fn stop_network(&self) { + self.network.with_context(ETH_PROTOCOL, |context| { + let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref()); + self.handler.sync.write().abort(&mut sync_io); + }); + self.stop(); + } + + fn network_config(&self) -> NetworkConfiguration { + NetworkConfiguration::from(self.network.config().clone()) + } +} + +#[derive(Binary, Debug, Clone)] +/// Network service configuration +pub struct NetworkConfiguration { + /// Directory path to store network configuration. None means nothing will be saved + pub config_path: Option, + /// IP address to listen for incoming connections. Listen to all connections by default + pub listen_address: Option, + /// IP address to advertise. Detected automatically if none. + pub public_address: Option, + /// Port for UDP connections, same as TCP by default + pub udp_port: Option, + /// Enable NAT configuration + pub nat_enabled: bool, + /// Enable discovery + pub discovery_enabled: bool, + /// List of initial node addresses + pub boot_nodes: Vec, + /// Use provided node key instead of default + pub use_secret: Option, + /// Number of connected peers to maintain + pub ideal_peers: u32, + /// List of reserved node addresses. + pub reserved_nodes: Vec, + /// The non-reserved peer mode. + pub allow_non_reserved: bool, +} + +impl NetworkConfiguration { + pub fn into_basic(self) -> Result { + use std::str::FromStr; + + Ok(BasicNetworkConfiguration { + config_path: self.config_path, + listen_address: match self.listen_address { None => None, Some(addr) => Some(try!(SocketAddr::from_str(&addr))) }, + public_address: match self.public_address { None => None, Some(addr) => Some(try!(SocketAddr::from_str(&addr))) }, + udp_port: self.udp_port, + nat_enabled: self.nat_enabled, + discovery_enabled: self.discovery_enabled, + boot_nodes: self.boot_nodes, + use_secret: self.use_secret, + ideal_peers: self.ideal_peers, + reserved_nodes: self.reserved_nodes, + non_reserved_mode: if self.allow_non_reserved { NonReservedPeerMode::Accept } else { NonReservedPeerMode::Deny }, + }) + } +} + +impl From for NetworkConfiguration { + fn from(other: BasicNetworkConfiguration) -> Self { + NetworkConfiguration { + config_path: other.config_path, + listen_address: other.listen_address.and_then(|addr| Some(format!("{}", addr))), + public_address: other.public_address.and_then(|addr| Some(format!("{}", addr))), + udp_port: other.udp_port, + nat_enabled: other.nat_enabled, + discovery_enabled: other.discovery_enabled, + boot_nodes: other.boot_nodes, + use_secret: other.use_secret, + ideal_peers: other.ideal_peers, + reserved_nodes: other.reserved_nodes, + allow_non_reserved: match other.non_reserved_mode { NonReservedPeerMode::Accept => true, _ => false } , + } + } +} diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 17f9e94cc..419407b17 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -14,8 +14,6 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . - - /// /// `BlockChain` synchronization strategy. /// Syncs to peers and keeps up to date. diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 4464fa8bc..0c164ee6a 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -34,10 +34,9 @@ //! extern crate ethsync; //! use std::env; //! use std::sync::Arc; -//! use util::network::{NetworkConfiguration}; //! use util::io::IoChannel; //! use ethcore::client::{Client, ClientConfig}; -//! use ethsync::{EthSync, SyncConfig, ManageNetwork}; +//! use ethsync::{EthSync, SyncConfig, ManageNetwork, NetworkConfiguration}; //! use ethcore::ethereum; //! use ethcore::miner::{GasPricer, Miner}; //! @@ -56,7 +55,7 @@ //! miner, //! IoChannel::disconnected() //! ).unwrap(); -//! let sync = EthSync::new(SyncConfig::default(), client, NetworkConfiguration::new()).unwrap(); +//! let sync = EthSync::new(SyncConfig::default(), client, NetworkConfiguration::from(util::NetworkConfiguration::new())).unwrap(); //! sync.start_network(); //! } //! ``` @@ -71,14 +70,10 @@ extern crate time; extern crate rand; #[macro_use] extern crate heapsize; - -use std::ops::*; -use std::sync::Arc; -use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, NetworkConfiguration}; -use util::{TimerToken, U256, H256, RwLock, UtilError}; -use ethcore::client::{Client, ChainNotify}; -use io::NetSyncIo; -use chain::ChainSync; +#[macro_use] +extern crate ethcore_ipc as ipc; +extern crate semver; +extern crate parking_lot; mod chain; mod blocks; @@ -87,166 +82,11 @@ mod io; #[cfg(test)] mod tests; -/// Ethereum sync protocol -pub const ETH_PROTOCOL: &'static str = "eth"; - -/// Sync configuration -pub struct SyncConfig { - /// Max blocks to download ahead - pub max_download_ahead_blocks: usize, - /// Network ID - pub network_id: U256, +mod api { + #![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues + include!(concat!(env!("OUT_DIR"), "/api.ipc.rs")); } -impl Default for SyncConfig { - fn default() -> SyncConfig { - SyncConfig { - max_download_ahead_blocks: 20000, - network_id: U256::from(1), - } - } -} +pub use api::{EthSync, SyncProvider, ManageNetwork, SyncConfig, NetworkConfiguration}; +pub use chain::{SyncStatus, SyncState}; -/// Current sync status -pub trait SyncProvider: Send + Sync { - /// Get sync status - fn status(&self) -> SyncStatus; -} - -/// Ethereum network protocol handler -pub struct EthSync { - /// Network service - network: NetworkService, - /// Protocol handler - handler: Arc, -} - -pub use self::chain::{SyncStatus, SyncState}; - -impl EthSync { - /// Creates and register protocol with the network service - pub fn new(config: SyncConfig, chain: Arc, network_config: NetworkConfiguration) -> Result, UtilError> { - let chain_sync = ChainSync::new(config, chain.deref()); - let service = try!(NetworkService::new(network_config)); - let sync = Arc::new(EthSync{ - network: service, - handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain }), - }); - - Ok(sync) - } -} - -impl SyncProvider for EthSync { - /// Get sync status - fn status(&self) -> SyncStatus { - self.handler.sync.read().status() - } -} - -struct SyncProtocolHandler { - /// Shared blockchain client. TODO: this should evetually become an IPC endpoint - chain: Arc, - /// Sync strategy - sync: RwLock, -} - -impl NetworkProtocolHandler for SyncProtocolHandler { - fn initialize(&self, io: &NetworkContext) { - io.register_timer(0, 1000).expect("Error registering sync timer"); - } - - fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) { - ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, self.chain.deref()), *peer, packet_id, data); - } - - fn connected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer); - } - - fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { - self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer); - } - - fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { - self.sync.write().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref())); - self.sync.write().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref())); - } -} - -impl ChainNotify for EthSync { - fn new_blocks(&self, - imported: Vec, - invalid: Vec, - enacted: Vec, - retracted: Vec, - sealed: Vec) - { - self.network.with_context(ETH_PROTOCOL, |context| { - let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref()); - self.handler.sync.write().chain_new_blocks( - &mut sync_io, - &imported, - &invalid, - &enacted, - &retracted, - &sealed); - }); - } - - fn start(&self) { - self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e)); - self.network.register_protocol(self.handler.clone(), ETH_PROTOCOL, &[62u8, 63u8]) - .unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e)); - } - - fn stop(&self) { - self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e)); - } -} - -/// Trait for managing network -pub trait ManageNetwork : Send + Sync { - /// Set mode for reserved peers (allow/deny peers that are unreserved) - fn set_non_reserved_mode(&self, mode: ::util::network::NonReservedPeerMode); - /// Remove reservation for the peer - fn remove_reserved_peer(&self, peer: &str) -> Result<(), String>; - /// Add reserved peer - fn add_reserved_peer(&self, peer: &str) -> Result<(), String>; - /// Start network - fn start_network(&self); - /// Stop network - fn stop_network(&self); - /// Query the current configuration of the network - fn network_config(&self) -> NetworkConfiguration; -} - -impl ManageNetwork for EthSync { - fn set_non_reserved_mode(&self, mode: ::util::network::NonReservedPeerMode) { - self.network.set_non_reserved_mode(mode); - } - - fn remove_reserved_peer(&self, peer: &str) -> Result<(), String> { - self.network.remove_reserved_peer(peer).map_err(|e| format!("{:?}", e)) - } - - fn add_reserved_peer(&self, peer: &str) -> Result<(), String> { - self.network.add_reserved_peer(peer).map_err(|e| format!("{:?}", e)) - } - - fn start_network(&self) { - self.start(); - } - - fn stop_network(&self) { - self.network.with_context(ETH_PROTOCOL, |context| { - let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref()); - self.handler.sync.write().abort(&mut sync_io); - }); - self.stop(); - } - - fn network_config(&self) -> NetworkConfiguration { - self.network.config().clone() - } -}