Sync IPC interface (#1584)
* chain notify trait * replaced network service with io service * fix ethcore crate warnings * refactored network service without generic * ethcore fix * ethsync refactoring * proper linking of notify * manage network interface * rpc crate rebinding * full rewire * sync internal io service * fix deadlock * fix warnings and removed async io * sync imported message propagation * fix rpc warnings * binart warnings * test fixes * rpc mocks and tests * fix util doctest * fix message name and removed empty notifier * pointers mess & dark mode fixed * fixed sync doctest * added few warnings * fix review * new convention match * fix error unwraps * doctest fix * basic library re-layout * missing files to relayout * duplicating network config on sync level * binary serializers for config * ipc endpoint for manage * ipc endpoint for sync * handshake sorting out * sorting out the multi-interface dispatch scenario * fixing tests * fix doctest
This commit is contained in:
parent
8d0e05adb7
commit
44bc8a08fb
6
Cargo.lock
generated
6
Cargo.lock
generated
@ -466,10 +466,16 @@ dependencies = [
|
||||
"clippy 0.0.79 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"env_logger 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"ethcore 1.3.0",
|
||||
"ethcore-ipc 1.3.0",
|
||||
"ethcore-ipc-codegen 1.3.0",
|
||||
"ethcore-ipc-nano 1.3.0",
|
||||
"ethcore-util 1.3.0",
|
||||
"heapsize 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"parking_lot 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"semver 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"syntex 0.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"time 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
]
|
||||
|
||||
|
@ -1014,4 +1014,4 @@ impl MayPanic for Client {
|
||||
}
|
||||
}
|
||||
|
||||
impl IpcConfig for Client { }
|
||||
impl IpcConfig<BlockChainClient> for Arc<BlockChainClient> { }
|
||||
|
@ -36,7 +36,7 @@ pub fn run_test_worker(scope: &crossbeam::Scope, stop: Arc<AtomicBool>, socket_p
|
||||
temp.as_path(),
|
||||
Arc::new(Miner::with_spec(get_test_spec())),
|
||||
IoChannel::disconnected()).unwrap();
|
||||
let mut worker = nanoipc::Worker::new(&client);
|
||||
let mut worker = nanoipc::Worker::new(&(client as Arc<BlockChainClient>));
|
||||
worker.add_reqrep(&socket_path).unwrap();
|
||||
while !stop.load(Ordering::Relaxed) {
|
||||
worker.poll();
|
||||
|
@ -574,8 +574,6 @@ fn push_client_implementation(
|
||||
interface_map: &InterfaceMap,
|
||||
push: &mut FnMut(Annotatable),
|
||||
) {
|
||||
let item_ident = interface_map.ident_map.qualified_ident(builder);
|
||||
|
||||
let mut index = -1i32;
|
||||
let items = interface_map.dispatches.iter()
|
||||
.map(|_| { index = index + 1; P(implement_client_method(cx, builder, index as u16, interface_map)) })
|
||||
@ -584,12 +582,13 @@ fn push_client_implementation(
|
||||
let generics = client_generics(builder, interface_map);
|
||||
let client_ident = client_qualified_ident(cx, builder, interface_map);
|
||||
let where_clause = &generics.where_clause;
|
||||
let endpoint = interface_map.endpoint;
|
||||
|
||||
let handshake_item = quote_impl_item!(cx,
|
||||
pub fn handshake(&self) -> Result<(), ::ipc::Error> {
|
||||
let payload = ::ipc::Handshake {
|
||||
protocol_version: $item_ident::protocol_version(),
|
||||
api_version: $item_ident::api_version(),
|
||||
protocol_version: Arc::<$endpoint>::protocol_version(),
|
||||
api_version: Arc::<$endpoint>::api_version(),
|
||||
};
|
||||
|
||||
::ipc::invoke(
|
||||
@ -734,6 +733,7 @@ struct InterfaceMap {
|
||||
pub generics: Generics,
|
||||
pub impl_trait: Option<TraitRef>,
|
||||
pub ident_map: IdentMap,
|
||||
pub endpoint: Ident,
|
||||
}
|
||||
|
||||
struct IdentMap {
|
||||
@ -753,10 +753,6 @@ impl IdentMap {
|
||||
builder.id(format!("{}Client", self.original_path.segments[0].identifier))
|
||||
}
|
||||
}
|
||||
|
||||
fn qualified_ident(&self, builder: &aster::AstBuilder) -> Ident {
|
||||
builder.id(format!("{}", ::syntax::print::pprust::path_to_string(&self.original_path).replace("<", "::<")))
|
||||
}
|
||||
}
|
||||
|
||||
fn ty_ident_map(original_ty: &P<Ty>) -> IdentMap {
|
||||
@ -804,8 +800,13 @@ fn implement_interface(
|
||||
let (handshake_arm, handshake_arm_buf) = implement_handshake_arm(cx);
|
||||
|
||||
let ty = ty_ident_map(&original_ty).ident(builder);
|
||||
let interface_endpoint = match *impl_trait {
|
||||
Some(ref trait_) => builder.id(::syntax::print::pprust::path_to_string(&trait_.path)),
|
||||
None => ty
|
||||
};
|
||||
|
||||
let ipc_item = quote_item!(cx,
|
||||
impl $impl_generics ::ipc::IpcInterface<$ty> for $ty $where_clause {
|
||||
impl $impl_generics ::ipc::IpcInterface<$interface_endpoint> for Arc<$interface_endpoint> $where_clause {
|
||||
fn dispatch<R>(&self, r: &mut R) -> Vec<u8>
|
||||
where R: ::std::io::Read
|
||||
{
|
||||
@ -844,5 +845,6 @@ fn implement_interface(
|
||||
dispatches: dispatch_table,
|
||||
generics: generics.clone(),
|
||||
impl_trait: impl_trait.clone(),
|
||||
endpoint: interface_endpoint,
|
||||
})
|
||||
}
|
||||
|
@ -68,4 +68,4 @@ impl HypervisorService {
|
||||
}
|
||||
}
|
||||
|
||||
impl ::ipc::IpcConfig for HypervisorService {}
|
||||
impl ::ipc::IpcConfig<HypervisorService> for Arc<HypervisorService> {}
|
||||
|
@ -33,7 +33,7 @@ const POLL_TIMEOUT: isize = 100;
|
||||
const CLIENT_CONNECTION_TIMEOUT: isize = 2500;
|
||||
|
||||
/// Generic worker to handle service (binded) sockets
|
||||
pub struct Worker<S> where S: IpcInterface<S> {
|
||||
pub struct Worker<S: ?Sized> where Arc<S>: IpcInterface<S> {
|
||||
service: Arc<S>,
|
||||
sockets: Vec<(Socket, Endpoint)>,
|
||||
polls: Vec<PollFd>,
|
||||
@ -116,7 +116,7 @@ pub enum SocketError {
|
||||
RequestLink,
|
||||
}
|
||||
|
||||
impl<S> Worker<S> where S: IpcInterface<S> {
|
||||
impl<S: ?Sized> Worker<S> where Arc<S>: IpcInterface<S> {
|
||||
/// New worker over specified `service`
|
||||
pub fn new(service: &Arc<S>) -> Worker<S> {
|
||||
Worker::<S> {
|
||||
|
@ -610,6 +610,7 @@ impl From<::semver::Version> for BinVersion {
|
||||
}
|
||||
}
|
||||
|
||||
binary_fixed_size!(u16);
|
||||
binary_fixed_size!(u64);
|
||||
binary_fixed_size!(u32);
|
||||
binary_fixed_size!(usize);
|
||||
|
@ -29,7 +29,7 @@ pub struct Handshake {
|
||||
|
||||
/// Allows to configure custom version and custom handshake response for
|
||||
/// ipc host
|
||||
pub trait IpcConfig {
|
||||
pub trait IpcConfig<I: ?Sized> {
|
||||
/// Current service api version
|
||||
/// Should be increased if any of the methods changes signature
|
||||
fn api_version() -> Version {
|
||||
@ -60,7 +60,7 @@ pub enum Error {
|
||||
|
||||
/// Allows implementor to be attached to generic worker and dispatch rpc requests
|
||||
/// over IPC
|
||||
pub trait IpcInterface<T>: IpcConfig {
|
||||
pub trait IpcInterface<I :?Sized> : IpcConfig<I> {
|
||||
/// reads the message from io, dispatches the call and returns serialized result
|
||||
fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read;
|
||||
|
||||
|
@ -20,8 +20,8 @@ use self::ansi_term::Style;
|
||||
|
||||
use std::time::{Instant, Duration};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use ethsync::SyncStatus;
|
||||
use util::{Uint, RwLock, NetworkConfiguration};
|
||||
use ethsync::{SyncStatus, NetworkConfiguration};
|
||||
use util::{Uint, RwLock};
|
||||
use ethcore::client::*;
|
||||
use number_prefix::{binary_prefix, Standalone, Prefixed};
|
||||
|
||||
|
@ -89,7 +89,7 @@ use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path, Bloc
|
||||
use ethcore::error::{ImportError};
|
||||
use ethcore::service::ClientService;
|
||||
use ethcore::spec::Spec;
|
||||
use ethsync::EthSync;
|
||||
use ethsync::{EthSync, NetworkConfiguration};
|
||||
use ethcore::miner::{Miner, MinerService, ExternalMiner};
|
||||
use migration::migrate;
|
||||
use informant::Informant;
|
||||
@ -248,7 +248,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
|
||||
let network_settings = Arc::new(conf.network_settings());
|
||||
|
||||
// Sync
|
||||
let sync = EthSync::new(sync_config, client.clone(), net_settings)
|
||||
let sync = EthSync::new(sync_config, client.clone(), NetworkConfiguration::from(net_settings))
|
||||
.unwrap_or_else(|e| die_with_error("Sync", ethcore::error::Error::Util(e)));
|
||||
service.set_notify(&(sync.clone() as Arc<ChainNotify>));
|
||||
|
||||
|
@ -20,7 +20,6 @@ use jsonrpc_core::*;
|
||||
use ethcore::miner::MinerService;
|
||||
use ethcore::client::MiningBlockChainClient;
|
||||
use ethsync::ManageNetwork;
|
||||
use util::network::NonReservedPeerMode;
|
||||
use v1::traits::EthcoreSet;
|
||||
use v1::types::{Bytes, H160, U256};
|
||||
|
||||
@ -116,7 +115,7 @@ impl<C, M> EthcoreSet for EthcoreSetClient<C, M> where
|
||||
fn add_reserved_peer(&self, params: Params) -> Result<Value, Error> {
|
||||
try!(self.active());
|
||||
from_params::<(String,)>(params).and_then(|(peer,)| {
|
||||
match take_weak!(self.net).add_reserved_peer(&peer) {
|
||||
match take_weak!(self.net).add_reserved_peer(peer) {
|
||||
Ok(()) => to_value(&true),
|
||||
Err(_) => Err(Error::invalid_params()),
|
||||
}
|
||||
@ -126,7 +125,7 @@ impl<C, M> EthcoreSet for EthcoreSetClient<C, M> where
|
||||
fn remove_reserved_peer(&self, params: Params) -> Result<Value, Error> {
|
||||
try!(self.active());
|
||||
from_params::<(String,)>(params).and_then(|(peer,)| {
|
||||
match take_weak!(self.net).remove_reserved_peer(&peer) {
|
||||
match take_weak!(self.net).remove_reserved_peer(peer) {
|
||||
Ok(()) => to_value(&true),
|
||||
Err(_) => Err(Error::invalid_params()),
|
||||
}
|
||||
@ -135,13 +134,13 @@ impl<C, M> EthcoreSet for EthcoreSetClient<C, M> where
|
||||
|
||||
fn drop_non_reserved_peers(&self, _: Params) -> Result<Value, Error> {
|
||||
try!(self.active());
|
||||
take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Deny);
|
||||
take_weak!(self.net).deny_unreserved_peers();
|
||||
to_value(&true)
|
||||
}
|
||||
|
||||
fn accept_non_reserved_peers(&self, _: Params) -> Result<Value, Error> {
|
||||
try!(self.active());
|
||||
take_weak!(self.net).set_non_reserved_mode(NonReservedPeerMode::Accept);
|
||||
take_weak!(self.net).accept_unreserved_peers();
|
||||
to_value(&true)
|
||||
}
|
||||
|
||||
|
@ -14,17 +14,18 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use ethsync::ManageNetwork;
|
||||
use util::network::NetworkConfiguration;
|
||||
use ethsync::{ManageNetwork, NetworkConfiguration};
|
||||
use util;
|
||||
|
||||
pub struct TestManageNetwork;
|
||||
|
||||
// TODO: rob, gavin (originally introduced this functions) - proper tests and test state
|
||||
impl ManageNetwork for TestManageNetwork {
|
||||
fn set_non_reserved_mode(&self, _mode: ::util::network::NonReservedPeerMode) {}
|
||||
fn remove_reserved_peer(&self, _peer: &str) -> Result<(), String> { Ok(()) }
|
||||
fn add_reserved_peer(&self, _peer: &str) -> Result<(), String> { Ok(()) }
|
||||
fn accept_unreserved_peers(&self) { }
|
||||
fn deny_unreserved_peers(&self) { }
|
||||
fn remove_reserved_peer(&self, _peer: String) -> Result<(), String> { Ok(()) }
|
||||
fn add_reserved_peer(&self, _peer: String) -> Result<(), String> { Ok(()) }
|
||||
fn start_network(&self) {}
|
||||
fn stop_network(&self) {}
|
||||
fn network_config(&self) -> NetworkConfiguration { NetworkConfiguration::new_local() }
|
||||
fn network_config(&self) -> NetworkConfiguration { NetworkConfiguration::from(util::NetworkConfiguration::new_local()) }
|
||||
}
|
||||
|
@ -4,9 +4,14 @@ name = "ethsync"
|
||||
version = "1.3.0"
|
||||
license = "GPL-3.0"
|
||||
authors = ["Ethcore <admin@ethcore.io"]
|
||||
build = "build.rs"
|
||||
|
||||
[lib]
|
||||
|
||||
[build-dependencies]
|
||||
syntex = "0.33"
|
||||
ethcore-ipc-codegen = { path = "../ipc/codegen" }
|
||||
|
||||
[dependencies]
|
||||
ethcore-util = { path = "../util" }
|
||||
ethcore = { path = "../ethcore" }
|
||||
@ -16,6 +21,10 @@ env_logger = "0.3"
|
||||
time = "0.1.34"
|
||||
rand = "0.3.13"
|
||||
heapsize = "0.3"
|
||||
ethcore-ipc = { path = "../ipc/rpc" }
|
||||
semver = "0.2"
|
||||
ethcore-ipc-nano = { path = "../ipc/nano" }
|
||||
parking_lot = "0.2.6"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
|
39
sync/build.rs
Normal file
39
sync/build.rs
Normal file
@ -0,0 +1,39 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
extern crate syntex;
|
||||
extern crate ethcore_ipc_codegen as codegen;
|
||||
|
||||
use std::env;
|
||||
use std::path::Path;
|
||||
|
||||
fn main() {
|
||||
let out_dir = env::var_os("OUT_DIR").unwrap();
|
||||
|
||||
// sync interface
|
||||
{
|
||||
let src = Path::new("src/api.rs");
|
||||
let intermediate = Path::new(&out_dir).join("api.intermediate.rs");
|
||||
let mut registry = syntex::Registry::new();
|
||||
codegen::register(&mut registry);
|
||||
registry.expand("", &src, &intermediate).unwrap();
|
||||
|
||||
let dst = Path::new(&out_dir).join("api.ipc.rs");
|
||||
let mut registry = syntex::Registry::new();
|
||||
codegen::register(&mut registry);
|
||||
registry.expand("", &intermediate, &dst).unwrap();
|
||||
}
|
||||
}
|
273
sync/src/api.rs
Normal file
273
sync/src/api.rs
Normal file
@ -0,0 +1,273 @@
|
||||
// Copyright 2015, 2016 Ethcore (UK) Ltd.
|
||||
// This file is part of Parity.
|
||||
|
||||
// Parity is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// Parity is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::ops::*;
|
||||
use std::sync::Arc;
|
||||
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId,
|
||||
NetworkConfiguration as BasicNetworkConfiguration, NonReservedPeerMode};
|
||||
use util::{TimerToken, U256, H256, UtilError, Secret, Populatable};
|
||||
use ethcore::client::{Client, ChainNotify};
|
||||
use io::NetSyncIo;
|
||||
use chain::{ChainSync, SyncStatus};
|
||||
use std::net::{SocketAddr, AddrParseError};
|
||||
use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig};
|
||||
use std::mem;
|
||||
use std::collections::VecDeque;
|
||||
use parking_lot::RwLock;
|
||||
|
||||
/// Ethereum sync protocol
|
||||
pub const ETH_PROTOCOL: &'static str = "eth";
|
||||
|
||||
/// Sync configuration
|
||||
pub struct SyncConfig {
|
||||
/// Max blocks to download ahead
|
||||
pub max_download_ahead_blocks: usize,
|
||||
/// Network ID
|
||||
pub network_id: U256,
|
||||
}
|
||||
|
||||
impl Default for SyncConfig {
|
||||
fn default() -> SyncConfig {
|
||||
SyncConfig {
|
||||
max_download_ahead_blocks: 20000,
|
||||
network_id: U256::from(1),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
binary_fixed_size!(SyncConfig);
|
||||
binary_fixed_size!(SyncStatus);
|
||||
|
||||
/// Current sync status
|
||||
pub trait SyncProvider: Send + Sync {
|
||||
/// Get sync status
|
||||
fn status(&self) -> SyncStatus;
|
||||
}
|
||||
|
||||
/// Ethereum network protocol handler
|
||||
pub struct EthSync {
|
||||
/// Network service
|
||||
network: NetworkService,
|
||||
/// Protocol handler
|
||||
handler: Arc<SyncProtocolHandler>,
|
||||
}
|
||||
|
||||
impl EthSync {
|
||||
/// Creates and register protocol with the network service
|
||||
pub fn new(config: SyncConfig, chain: Arc<Client>, network_config: NetworkConfiguration) -> Result<Arc<EthSync>, UtilError> {
|
||||
let chain_sync = ChainSync::new(config, chain.deref());
|
||||
let service = try!(NetworkService::new(try!(network_config.into_basic())));
|
||||
let sync = Arc::new(EthSync{
|
||||
network: service,
|
||||
handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain }),
|
||||
});
|
||||
|
||||
Ok(sync)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Ipc)]
|
||||
#[ipc(client_ident="SyncClient")]
|
||||
impl SyncProvider for EthSync {
|
||||
/// Get sync status
|
||||
fn status(&self) -> SyncStatus {
|
||||
self.handler.sync.write().status()
|
||||
}
|
||||
}
|
||||
|
||||
struct SyncProtocolHandler {
|
||||
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
|
||||
chain: Arc<Client>,
|
||||
/// Sync strategy
|
||||
sync: RwLock<ChainSync>,
|
||||
}
|
||||
|
||||
impl NetworkProtocolHandler for SyncProtocolHandler {
|
||||
fn initialize(&self, io: &NetworkContext) {
|
||||
io.register_timer(0, 1000).expect("Error registering sync timer");
|
||||
}
|
||||
|
||||
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, self.chain.deref()), *peer, packet_id, data);
|
||||
}
|
||||
|
||||
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer);
|
||||
}
|
||||
|
||||
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer);
|
||||
}
|
||||
|
||||
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
|
||||
self.sync.write().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref()));
|
||||
self.sync.write().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref()));
|
||||
}
|
||||
}
|
||||
|
||||
impl ChainNotify for EthSync {
|
||||
fn new_blocks(&self,
|
||||
imported: Vec<H256>,
|
||||
invalid: Vec<H256>,
|
||||
enacted: Vec<H256>,
|
||||
retracted: Vec<H256>,
|
||||
sealed: Vec<H256>)
|
||||
{
|
||||
self.network.with_context(ETH_PROTOCOL, |context| {
|
||||
let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref());
|
||||
self.handler.sync.write().chain_new_blocks(
|
||||
&mut sync_io,
|
||||
&imported,
|
||||
&invalid,
|
||||
&enacted,
|
||||
&retracted,
|
||||
&sealed);
|
||||
});
|
||||
}
|
||||
|
||||
fn start(&self) {
|
||||
self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e));
|
||||
self.network.register_protocol(self.handler.clone(), ETH_PROTOCOL, &[62u8, 63u8])
|
||||
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
|
||||
}
|
||||
}
|
||||
|
||||
impl IpcConfig<ManageNetwork> for Arc<ManageNetwork> { }
|
||||
impl IpcConfig<SyncProvider> for Arc<SyncProvider> { }
|
||||
|
||||
/// Trait for managing network
|
||||
pub trait ManageNetwork : Send + Sync {
|
||||
/// Set to allow unreserved peers to connect
|
||||
fn accept_unreserved_peers(&self);
|
||||
/// Set to deny unreserved peers to connect
|
||||
fn deny_unreserved_peers(&self);
|
||||
/// Remove reservation for the peer
|
||||
fn remove_reserved_peer(&self, peer: String) -> Result<(), String>;
|
||||
/// Add reserved peer
|
||||
fn add_reserved_peer(&self, peer: String) -> Result<(), String>;
|
||||
/// Start network
|
||||
fn start_network(&self);
|
||||
/// Stop network
|
||||
fn stop_network(&self);
|
||||
/// Query the current configuration of the network
|
||||
fn network_config(&self) -> NetworkConfiguration;
|
||||
}
|
||||
|
||||
|
||||
#[derive(Ipc)]
|
||||
#[ipc(client_ident="NetworkManagerClient")]
|
||||
impl ManageNetwork for EthSync {
|
||||
fn accept_unreserved_peers(&self) {
|
||||
self.network.set_non_reserved_mode(NonReservedPeerMode::Accept);
|
||||
}
|
||||
|
||||
fn deny_unreserved_peers(&self) {
|
||||
self.network.set_non_reserved_mode(NonReservedPeerMode::Deny);
|
||||
}
|
||||
|
||||
fn remove_reserved_peer(&self, peer: String) -> Result<(), String> {
|
||||
self.network.remove_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
|
||||
}
|
||||
|
||||
fn add_reserved_peer(&self, peer: String) -> Result<(), String> {
|
||||
self.network.add_reserved_peer(&peer).map_err(|e| format!("{:?}", e))
|
||||
}
|
||||
|
||||
fn start_network(&self) {
|
||||
self.start();
|
||||
}
|
||||
|
||||
fn stop_network(&self) {
|
||||
self.network.with_context(ETH_PROTOCOL, |context| {
|
||||
let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref());
|
||||
self.handler.sync.write().abort(&mut sync_io);
|
||||
});
|
||||
self.stop();
|
||||
}
|
||||
|
||||
fn network_config(&self) -> NetworkConfiguration {
|
||||
NetworkConfiguration::from(self.network.config().clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Binary, Debug, Clone)]
|
||||
/// Network service configuration
|
||||
pub struct NetworkConfiguration {
|
||||
/// Directory path to store network configuration. None means nothing will be saved
|
||||
pub config_path: Option<String>,
|
||||
/// IP address to listen for incoming connections. Listen to all connections by default
|
||||
pub listen_address: Option<String>,
|
||||
/// IP address to advertise. Detected automatically if none.
|
||||
pub public_address: Option<String>,
|
||||
/// Port for UDP connections, same as TCP by default
|
||||
pub udp_port: Option<u16>,
|
||||
/// Enable NAT configuration
|
||||
pub nat_enabled: bool,
|
||||
/// Enable discovery
|
||||
pub discovery_enabled: bool,
|
||||
/// List of initial node addresses
|
||||
pub boot_nodes: Vec<String>,
|
||||
/// Use provided node key instead of default
|
||||
pub use_secret: Option<Secret>,
|
||||
/// Number of connected peers to maintain
|
||||
pub ideal_peers: u32,
|
||||
/// List of reserved node addresses.
|
||||
pub reserved_nodes: Vec<String>,
|
||||
/// The non-reserved peer mode.
|
||||
pub allow_non_reserved: bool,
|
||||
}
|
||||
|
||||
impl NetworkConfiguration {
|
||||
pub fn into_basic(self) -> Result<BasicNetworkConfiguration, AddrParseError> {
|
||||
use std::str::FromStr;
|
||||
|
||||
Ok(BasicNetworkConfiguration {
|
||||
config_path: self.config_path,
|
||||
listen_address: match self.listen_address { None => None, Some(addr) => Some(try!(SocketAddr::from_str(&addr))) },
|
||||
public_address: match self.public_address { None => None, Some(addr) => Some(try!(SocketAddr::from_str(&addr))) },
|
||||
udp_port: self.udp_port,
|
||||
nat_enabled: self.nat_enabled,
|
||||
discovery_enabled: self.discovery_enabled,
|
||||
boot_nodes: self.boot_nodes,
|
||||
use_secret: self.use_secret,
|
||||
ideal_peers: self.ideal_peers,
|
||||
reserved_nodes: self.reserved_nodes,
|
||||
non_reserved_mode: if self.allow_non_reserved { NonReservedPeerMode::Accept } else { NonReservedPeerMode::Deny },
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BasicNetworkConfiguration> for NetworkConfiguration {
|
||||
fn from(other: BasicNetworkConfiguration) -> Self {
|
||||
NetworkConfiguration {
|
||||
config_path: other.config_path,
|
||||
listen_address: other.listen_address.and_then(|addr| Some(format!("{}", addr))),
|
||||
public_address: other.public_address.and_then(|addr| Some(format!("{}", addr))),
|
||||
udp_port: other.udp_port,
|
||||
nat_enabled: other.nat_enabled,
|
||||
discovery_enabled: other.discovery_enabled,
|
||||
boot_nodes: other.boot_nodes,
|
||||
use_secret: other.use_secret,
|
||||
ideal_peers: other.ideal_peers,
|
||||
reserved_nodes: other.reserved_nodes,
|
||||
allow_non_reserved: match other.non_reserved_mode { NonReservedPeerMode::Accept => true, _ => false } ,
|
||||
}
|
||||
}
|
||||
}
|
@ -14,8 +14,6 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
|
||||
///
|
||||
/// `BlockChain` synchronization strategy.
|
||||
/// Syncs to peers and keeps up to date.
|
||||
|
182
sync/src/lib.rs
182
sync/src/lib.rs
@ -34,10 +34,9 @@
|
||||
//! extern crate ethsync;
|
||||
//! use std::env;
|
||||
//! use std::sync::Arc;
|
||||
//! use util::network::{NetworkConfiguration};
|
||||
//! use util::io::IoChannel;
|
||||
//! use ethcore::client::{Client, ClientConfig};
|
||||
//! use ethsync::{EthSync, SyncConfig, ManageNetwork};
|
||||
//! use ethsync::{EthSync, SyncConfig, ManageNetwork, NetworkConfiguration};
|
||||
//! use ethcore::ethereum;
|
||||
//! use ethcore::miner::{GasPricer, Miner};
|
||||
//!
|
||||
@ -56,7 +55,7 @@
|
||||
//! miner,
|
||||
//! IoChannel::disconnected()
|
||||
//! ).unwrap();
|
||||
//! let sync = EthSync::new(SyncConfig::default(), client, NetworkConfiguration::new()).unwrap();
|
||||
//! let sync = EthSync::new(SyncConfig::default(), client, NetworkConfiguration::from(util::NetworkConfiguration::new())).unwrap();
|
||||
//! sync.start_network();
|
||||
//! }
|
||||
//! ```
|
||||
@ -71,14 +70,10 @@ extern crate time;
|
||||
extern crate rand;
|
||||
#[macro_use]
|
||||
extern crate heapsize;
|
||||
|
||||
use std::ops::*;
|
||||
use std::sync::Arc;
|
||||
use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, PeerId, NetworkConfiguration};
|
||||
use util::{TimerToken, U256, H256, RwLock, UtilError};
|
||||
use ethcore::client::{Client, ChainNotify};
|
||||
use io::NetSyncIo;
|
||||
use chain::ChainSync;
|
||||
#[macro_use]
|
||||
extern crate ethcore_ipc as ipc;
|
||||
extern crate semver;
|
||||
extern crate parking_lot;
|
||||
|
||||
mod chain;
|
||||
mod blocks;
|
||||
@ -87,166 +82,11 @@ mod io;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// Ethereum sync protocol
|
||||
pub const ETH_PROTOCOL: &'static str = "eth";
|
||||
|
||||
/// Sync configuration
|
||||
pub struct SyncConfig {
|
||||
/// Max blocks to download ahead
|
||||
pub max_download_ahead_blocks: usize,
|
||||
/// Network ID
|
||||
pub network_id: U256,
|
||||
mod api {
|
||||
#![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues
|
||||
include!(concat!(env!("OUT_DIR"), "/api.ipc.rs"));
|
||||
}
|
||||
|
||||
impl Default for SyncConfig {
|
||||
fn default() -> SyncConfig {
|
||||
SyncConfig {
|
||||
max_download_ahead_blocks: 20000,
|
||||
network_id: U256::from(1),
|
||||
}
|
||||
}
|
||||
}
|
||||
pub use api::{EthSync, SyncProvider, ManageNetwork, SyncConfig, NetworkConfiguration};
|
||||
pub use chain::{SyncStatus, SyncState};
|
||||
|
||||
/// Current sync status
|
||||
pub trait SyncProvider: Send + Sync {
|
||||
/// Get sync status
|
||||
fn status(&self) -> SyncStatus;
|
||||
}
|
||||
|
||||
/// Ethereum network protocol handler
|
||||
pub struct EthSync {
|
||||
/// Network service
|
||||
network: NetworkService,
|
||||
/// Protocol handler
|
||||
handler: Arc<SyncProtocolHandler>,
|
||||
}
|
||||
|
||||
pub use self::chain::{SyncStatus, SyncState};
|
||||
|
||||
impl EthSync {
|
||||
/// Creates and register protocol with the network service
|
||||
pub fn new(config: SyncConfig, chain: Arc<Client>, network_config: NetworkConfiguration) -> Result<Arc<EthSync>, UtilError> {
|
||||
let chain_sync = ChainSync::new(config, chain.deref());
|
||||
let service = try!(NetworkService::new(network_config));
|
||||
let sync = Arc::new(EthSync{
|
||||
network: service,
|
||||
handler: Arc::new(SyncProtocolHandler { sync: RwLock::new(chain_sync), chain: chain }),
|
||||
});
|
||||
|
||||
Ok(sync)
|
||||
}
|
||||
}
|
||||
|
||||
impl SyncProvider for EthSync {
|
||||
/// Get sync status
|
||||
fn status(&self) -> SyncStatus {
|
||||
self.handler.sync.read().status()
|
||||
}
|
||||
}
|
||||
|
||||
struct SyncProtocolHandler {
|
||||
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
|
||||
chain: Arc<Client>,
|
||||
/// Sync strategy
|
||||
sync: RwLock<ChainSync>,
|
||||
}
|
||||
|
||||
impl NetworkProtocolHandler for SyncProtocolHandler {
|
||||
fn initialize(&self, io: &NetworkContext) {
|
||||
io.register_timer(0, 1000).expect("Error registering sync timer");
|
||||
}
|
||||
|
||||
fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]) {
|
||||
ChainSync::dispatch_packet(&self.sync, &mut NetSyncIo::new(io, self.chain.deref()), *peer, packet_id, data);
|
||||
}
|
||||
|
||||
fn connected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.sync.write().on_peer_connected(&mut NetSyncIo::new(io, self.chain.deref()), *peer);
|
||||
}
|
||||
|
||||
fn disconnected(&self, io: &NetworkContext, peer: &PeerId) {
|
||||
self.sync.write().on_peer_aborting(&mut NetSyncIo::new(io, self.chain.deref()), *peer);
|
||||
}
|
||||
|
||||
fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
|
||||
self.sync.write().maintain_peers(&mut NetSyncIo::new(io, self.chain.deref()));
|
||||
self.sync.write().maintain_sync(&mut NetSyncIo::new(io, self.chain.deref()));
|
||||
}
|
||||
}
|
||||
|
||||
impl ChainNotify for EthSync {
|
||||
fn new_blocks(&self,
|
||||
imported: Vec<H256>,
|
||||
invalid: Vec<H256>,
|
||||
enacted: Vec<H256>,
|
||||
retracted: Vec<H256>,
|
||||
sealed: Vec<H256>)
|
||||
{
|
||||
self.network.with_context(ETH_PROTOCOL, |context| {
|
||||
let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref());
|
||||
self.handler.sync.write().chain_new_blocks(
|
||||
&mut sync_io,
|
||||
&imported,
|
||||
&invalid,
|
||||
&enacted,
|
||||
&retracted,
|
||||
&sealed);
|
||||
});
|
||||
}
|
||||
|
||||
fn start(&self) {
|
||||
self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e));
|
||||
self.network.register_protocol(self.handler.clone(), ETH_PROTOCOL, &[62u8, 63u8])
|
||||
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
|
||||
}
|
||||
|
||||
fn stop(&self) {
|
||||
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
|
||||
}
|
||||
}
|
||||
|
||||
/// Trait for managing network
|
||||
pub trait ManageNetwork : Send + Sync {
|
||||
/// Set mode for reserved peers (allow/deny peers that are unreserved)
|
||||
fn set_non_reserved_mode(&self, mode: ::util::network::NonReservedPeerMode);
|
||||
/// Remove reservation for the peer
|
||||
fn remove_reserved_peer(&self, peer: &str) -> Result<(), String>;
|
||||
/// Add reserved peer
|
||||
fn add_reserved_peer(&self, peer: &str) -> Result<(), String>;
|
||||
/// Start network
|
||||
fn start_network(&self);
|
||||
/// Stop network
|
||||
fn stop_network(&self);
|
||||
/// Query the current configuration of the network
|
||||
fn network_config(&self) -> NetworkConfiguration;
|
||||
}
|
||||
|
||||
impl ManageNetwork for EthSync {
|
||||
fn set_non_reserved_mode(&self, mode: ::util::network::NonReservedPeerMode) {
|
||||
self.network.set_non_reserved_mode(mode);
|
||||
}
|
||||
|
||||
fn remove_reserved_peer(&self, peer: &str) -> Result<(), String> {
|
||||
self.network.remove_reserved_peer(peer).map_err(|e| format!("{:?}", e))
|
||||
}
|
||||
|
||||
fn add_reserved_peer(&self, peer: &str) -> Result<(), String> {
|
||||
self.network.add_reserved_peer(peer).map_err(|e| format!("{:?}", e))
|
||||
}
|
||||
|
||||
fn start_network(&self) {
|
||||
self.start();
|
||||
}
|
||||
|
||||
fn stop_network(&self) {
|
||||
self.network.with_context(ETH_PROTOCOL, |context| {
|
||||
let mut sync_io = NetSyncIo::new(context, self.handler.chain.deref());
|
||||
self.handler.sync.write().abort(&mut sync_io);
|
||||
});
|
||||
self.stop();
|
||||
}
|
||||
|
||||
fn network_config(&self) -> NetworkConfiguration {
|
||||
self.network.config().clone()
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user