light: integrate with sync + serve_light CLI

This commit is contained in:
Robert Habermeier 2016-12-08 23:21:47 +01:00
parent efd66f566d
commit 6f5f1f5e26
20 changed files with 236 additions and 74 deletions

1
Cargo.lock generated
View File

@ -18,6 +18,7 @@ dependencies = [
"ethcore-ipc-hypervisor 1.2.0",
"ethcore-ipc-nano 1.4.0",
"ethcore-ipc-tests 0.1.0",
"ethcore-light 1.5.0",
"ethcore-logger 1.5.0",
"ethcore-rpc 1.5.0",
"ethcore-signer 1.5.0",

View File

@ -47,6 +47,7 @@ rlp = { path = "util/rlp" }
ethcore-stratum = { path = "stratum" }
ethcore-dapps = { path = "dapps", optional = true }
clippy = { version = "0.0.103", optional = true}
ethcore-light = { path = "ethcore/light" }
[target.'cfg(windows)'.dependencies]
winapi = "0.2"

View File

@ -20,6 +20,7 @@ extern crate ethcore_ipc_codegen;
#[cfg(feature = "ipc")]
fn main() {
ethcore_ipc_codegen::derive_binary("src/types/mod.rs.in").unwrap();
ethcore_ipc_codegen::derive_ipc_cond("src/provider.rs", true).unwrap();
}
#[cfg(not(feature = "ipc"))]

View File

@ -33,8 +33,21 @@
pub mod client;
pub mod net;
#[cfg(not(feature = "ipc"))]
pub mod provider;
#[cfg(feature = "ipc")]
pub mod provider {
#![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues
include!(concat!(env!("OUT_DIR"), "/provider.rs"));
}
#[cfg(feature = "ipc")]
pub mod remote {
pub use provider::LightProviderClient;
}
mod types;
pub use self::provider::Provider;

View File

@ -22,6 +22,9 @@
//!
//! This module provides an interface for configuration of buffer
//! flow costs and recharge rates.
//!
//! Current default costs are picked completely arbitrarily, not based
//! on any empirical timings or mathematical models.
use request;
use super::packet;
@ -273,6 +276,16 @@ impl FlowParams {
}
}
impl Default for FlowParams {
fn default() -> Self {
FlowParams {
limit: 50_000_000.into(),
costs: CostTable::default(),
recharge: 100_000.into(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -30,13 +30,14 @@ use util::{Bytes, Mutex, RwLock, U256};
use time::SteadyTime;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use provider::Provider;
use request::{self, Request};
use self::buffer_flow::{Buffer, FlowParams};
use self::context::{IoContext, EventContext, Ctx};
use self::context::Ctx;
use self::error::{Error, Punishment};
mod buffer_flow;
@ -47,16 +48,20 @@ mod status;
#[cfg(test)]
mod tests;
pub use self::status::{Status, Capabilities, Announcement, NetworkId};
pub use self::context::{EventContext, IoContext};
pub use self::status::{Status, Capabilities, Announcement};
const TIMEOUT: TimerToken = 0;
const TIMEOUT_INTERVAL_MS: u64 = 1000;
// LPV1
const PROTOCOL_VERSION: u32 = 1;
// Supported protocol versions.
pub const PROTOCOL_VERSIONS: &'static [u8] = &[1];
// TODO [rob] make configurable.
const PROTOCOL_ID: [u8; 3] = *b"les";
// Max protocol version.
pub const MAX_PROTOCOL_VERSION: u8 = 1;
// Packet count for LES.
pub const PACKET_COUNT: u8 = 15;
// packet ID definitions.
mod packet {
@ -173,6 +178,8 @@ pub trait Handler: Send + Sync {
/// Called when a peer responds with header proofs. Each proof is a block header coupled
/// with a series of trie nodes is ascending order by distance from the root.
fn on_header_proofs(&self, _ctx: &EventContext, _req_id: ReqId, _proofs: &[(Bytes, Vec<Bytes>)]) { }
/// Called on abort.
fn on_abort(&self) { }
}
// a request, the peer who it was made to, and the time it was made.
@ -185,7 +192,7 @@ struct Requested {
/// Protocol parameters.
pub struct Params {
/// Network id.
pub network_id: NetworkId,
pub network_id: u64,
/// Buffer flow parameters.
pub flow_params: FlowParams,
/// Initial capabilities.
@ -203,9 +210,9 @@ pub struct Params {
// Locks must be acquired in the order declared, and when holding a read lock
// on the peers, only one peer may be held at a time.
pub struct LightProtocol {
provider: Box<Provider>,
provider: Arc<Provider>,
genesis_hash: H256,
network_id: NetworkId,
network_id: u64,
pending_peers: RwLock<HashMap<PeerId, PendingPeer>>,
peers: RwLock<HashMap<PeerId, Mutex<Peer>>>,
pending_requests: RwLock<HashMap<usize, Requested>>,
@ -217,7 +224,7 @@ pub struct LightProtocol {
impl LightProtocol {
/// Create a new instance of the protocol manager.
pub fn new(provider: Box<Provider>, params: Params) -> Self {
pub fn new(provider: Arc<Provider>, params: Params) -> Self {
let genesis_hash = provider.chain_info().genesis_hash;
LightProtocol {
provider: provider,
@ -323,6 +330,23 @@ impl LightProtocol {
self.handlers.push(handler);
}
/// Signal to handlers that network activity is being aborted
/// and clear peer data.
pub fn abort(&self) {
for handler in &self.handlers {
handler.on_abort();
}
// acquire in order and hold.
let mut pending_peers = self.pending_peers.write();
let mut peers = self.peers.write();
let mut pending_requests = self.pending_requests.write();
pending_peers.clear();
peers.clear();
pending_requests.clear();
}
// Does the common pre-verification of responses before the response itself
// is actually decoded:
// - check whether peer exists
@ -460,7 +484,7 @@ impl LightProtocol {
head_hash: chain_info.best_block_hash,
head_num: chain_info.best_block_number,
genesis_hash: chain_info.genesis_hash,
protocol_version: PROTOCOL_VERSION,
protocol_version: MAX_PROTOCOL_VERSION as u32,
network_id: self.network_id,
last_head: None,
};

View File

@ -82,26 +82,6 @@ impl Key {
}
}
/// Network ID structure.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u32)]
pub enum NetworkId {
/// ID for the mainnet
Mainnet = 1,
/// ID for the testnet
Testnet = 0,
}
impl NetworkId {
fn from_raw(raw: u32) -> Option<Self> {
match raw {
0 => Some(NetworkId::Testnet),
1 => Some(NetworkId::Mainnet),
_ => None,
}
}
}
// helper for decoding key-value pairs in the handshake or an announcement.
struct Parser<'a> {
pos: usize,
@ -164,7 +144,7 @@ pub struct Status {
/// Protocol version.
pub protocol_version: u32,
/// Network id of this peer.
pub network_id: NetworkId,
pub network_id: u64,
/// Total difficulty of the head of the chain.
pub head_td: U256,
/// Hash of the best block.
@ -225,8 +205,7 @@ pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowP
let status = Status {
protocol_version: try!(parser.expect(Key::ProtocolVersion)),
network_id: try!(parser.expect(Key::NetworkId)
.and_then(|id: u32| NetworkId::from_raw(id).ok_or(DecoderError::Custom("Invalid network ID")))),
network_id: try!(parser.expect(Key::NetworkId)),
head_td: try!(parser.expect(Key::HeadTD)),
head_hash: try!(parser.expect(Key::HeadHash)),
head_num: try!(parser.expect(Key::HeadNum)),
@ -254,7 +233,7 @@ pub fn parse_handshake(rlp: UntrustedRlp) -> Result<(Status, Capabilities, FlowP
pub fn write_handshake(status: &Status, capabilities: &Capabilities, flow_params: &FlowParams) -> Vec<u8> {
let mut pairs = Vec::new();
pairs.push(encode_pair(Key::ProtocolVersion, &status.protocol_version));
pairs.push(encode_pair(Key::NetworkId, &(status.network_id as u32)));
pairs.push(encode_pair(Key::NetworkId, &(status.network_id as u64)));
pairs.push(encode_pair(Key::HeadTD, &status.head_td));
pairs.push(encode_pair(Key::HeadHash, &status.head_hash));
pairs.push(encode_pair(Key::HeadNum, &status.head_num));
@ -385,7 +364,7 @@ mod tests {
fn full_handshake() {
let status = Status {
protocol_version: 1,
network_id: NetworkId::Mainnet,
network_id: 1,
head_td: U256::default(),
head_hash: H256::default(),
head_num: 10,
@ -420,7 +399,7 @@ mod tests {
fn partial_handshake() {
let status = Status {
protocol_version: 1,
network_id: NetworkId::Mainnet,
network_id: 1,
head_td: U256::default(),
head_hash: H256::default(),
head_num: 10,
@ -455,7 +434,7 @@ mod tests {
fn skip_unknown_keys() {
let status = Status {
protocol_version: 1,
network_id: NetworkId::Mainnet,
network_id: 1,
head_td: U256::default(),
head_hash: H256::default(),
head_num: 10,

View File

@ -25,7 +25,7 @@ use network::PeerId;
use net::buffer_flow::FlowParams;
use net::context::IoContext;
use net::status::{Capabilities, Status, NetworkId, write_handshake};
use net::status::{Capabilities, Status, write_handshake};
use net::{encode_request, LightProtocol, Params, packet};
use provider::Provider;
use request::{self, Request, Headers};
@ -174,8 +174,8 @@ fn setup(flow_params: FlowParams, capabilities: Capabilities) -> (Arc<TestProvid
client: TestBlockChainClient::new(),
});
let proto = LightProtocol::new(Box::new(TestProvider(provider.clone())), Params {
network_id: NetworkId::Testnet,
let proto = LightProtocol::new(Arc::new(TestProvider(provider.clone())), Params {
network_id: 2,
flow_params: flow_params,
capabilities: capabilities,
});
@ -185,8 +185,8 @@ fn setup(flow_params: FlowParams, capabilities: Capabilities) -> (Arc<TestProvid
fn status(chain_info: BlockChainInfo) -> Status {
Status {
protocol_version: ::net::PROTOCOL_VERSION,
network_id: NetworkId::Testnet,
protocol_version: 1,
network_id: 2,
head_td: chain_info.total_difficulty,
head_hash: chain_info.best_block_hash,
head_num: chain_info.best_block_number,

View File

@ -33,6 +33,7 @@ use request;
/// or empty vector where appropriate.
///
/// [1]: https://github.com/ethcore/parity/wiki/Light-Ethereum-Subprotocol-(LES)
#[cfg_attr(feature = "ipc", ipc(client_ident="LightProviderClient"))]
pub trait Provider: Send + Sync {
/// Provide current blockchain info.
fn chain_info(&self) -> BlockChainInfo;

View File

@ -32,6 +32,7 @@ warp = true
allow_ips = "all"
snapshot_peers = 0
max_pending_peers = 64
serve_light = true
reserved_only = false
reserved_peers = "./path_to_file"

View File

@ -135,6 +135,8 @@ usage! {
flag_reserved_only: bool = false,
or |c: &Config| otry!(c.network).reserved_only.clone(),
flag_no_ancient_blocks: bool = false, or |_| None,
flag_serve_light: bool = false,
or |c: &Config| otry!(c.network).serve_light.clone(),
// -- API and Console Options
// RPC
@ -334,6 +336,7 @@ struct Network {
node_key: Option<String>,
reserved_peers: Option<String>,
reserved_only: Option<bool>,
serve_light: Option<bool>,
}
#[derive(Default, Debug, PartialEq, RustcDecodable)]
@ -543,6 +546,7 @@ mod tests {
flag_reserved_peers: Some("./path_to_file".into()),
flag_reserved_only: false,
flag_no_ancient_blocks: false,
flag_serve_light: true,
// -- API and Console Options
// RPC
@ -713,6 +717,7 @@ mod tests {
node_key: None,
reserved_peers: Some("./path/to/reserved_peers".into()),
reserved_only: Some(true),
serve_light: None,
}),
rpc: Some(Rpc {
disable: Some(true),

View File

@ -97,6 +97,7 @@ Networking Options:
--max-pending-peers NUM Allow up to NUM pending connections. (default: {flag_max_pending_peers})
--no-ancient-blocks Disable downloading old blocks after snapshot restoration
or warp sync. (default: {flag_no_ancient_blocks})
--serve-light Experimental: Serve light client peers. (default: {flag_serve_light})
API and Console Options:
--no-jsonrpc Disable the JSON-RPC API server. (default: {flag_no_jsonrpc})

View File

@ -275,6 +275,7 @@ impl Configuration {
no_periodic_snapshot: self.args.flag_no_periodic_snapshot,
check_seal: !self.args.flag_no_seal_check,
download_old_blocks: !self.args.flag_no_ancient_blocks,
serve_light: self.args.flag_serve_light,
};
Cmd::Run(run_cmd)
};
@ -921,6 +922,7 @@ mod tests {
no_periodic_snapshot: false,
check_seal: true,
download_old_blocks: true,
serve_light: false,
}));
}

View File

@ -43,6 +43,7 @@ extern crate serde;
extern crate serde_json;
extern crate rlp;
extern crate ethcore_hash_fetch as hash_fetch;
extern crate ethcore_light as light;
extern crate ethcore_ipc_hypervisor as hypervisor;
extern crate ethcore_rpc;

View File

@ -15,16 +15,22 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use std::sync::Arc;
use std::path::Path;
use ethcore::client::BlockChainClient;
use hypervisor::Hypervisor;
use ethsync::{SyncConfig, NetworkConfiguration, NetworkError};
use ethsync::{SyncConfig, NetworkConfiguration, NetworkError, Params};
use ethcore::snapshot::SnapshotService;
use light::Provider;
#[cfg(not(feature="ipc"))]
use self::no_ipc_deps::*;
#[cfg(not(feature="ipc"))]
use ethcore_logger::Config as LogConfig;
#[cfg(feature="ipc")]
use self::ipc_deps::*;
use ethcore_logger::Config as LogConfig;
use std::path::Path;
#[cfg(feature="ipc")]
pub mod service_urls {
@ -36,6 +42,8 @@ pub mod service_urls {
pub const SYNC_NOTIFY: &'static str = "parity-sync-notify.ipc";
pub const NETWORK_MANAGER: &'static str = "parity-manage-net.ipc";
pub const SYNC_CONTROL: &'static str = "parity-sync-control.ipc";
pub const LIGHT_PROVIDER: &'static str = "parity-light-provider.ipc";
#[cfg(feature="stratum")]
pub const STRATUM: &'static str = "parity-stratum.ipc";
#[cfg(feature="stratum")]
@ -75,6 +83,7 @@ mod ipc_deps {
pub use nanoipc::{GuardedSocket, NanoSocket, generic_client, fast_client};
pub use ipc::IpcSocket;
pub use ipc::binary::serialize;
pub use light::remote::LightProviderClient;
}
#[cfg(feature="ipc")]
@ -124,6 +133,7 @@ pub fn sync
net_cfg: NetworkConfiguration,
_client: Arc<BlockChainClient>,
_snapshot_service: Arc<SnapshotService>,
_provider: Arc<Provider>,
log_settings: &LogConfig,
)
-> Result<SyncModules, NetworkError>
@ -141,7 +151,9 @@ pub fn sync
&service_urls::with_base(&hypervisor.io_path, service_urls::SYNC_NOTIFY)).unwrap();
let manage_client = generic_client::<NetworkManagerClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::NETWORK_MANAGER)).unwrap();
let provider_client = generic_client::<LightProviderClient<_>>(
&service_urls::with_base(&hypervisor.io_path, service_urls::LIGHT_PROVIDER)).unwrap();
*hypervisor_ref = Some(hypervisor);
Ok((sync_client, manage_client, notify_client))
}
@ -154,10 +166,18 @@ pub fn sync
net_cfg: NetworkConfiguration,
client: Arc<BlockChainClient>,
snapshot_service: Arc<SnapshotService>,
provider: Arc<Provider>,
_log_settings: &LogConfig,
)
-> Result<SyncModules, NetworkError>
{
let eth_sync = try!(EthSync::new(sync_cfg, client, snapshot_service, net_cfg));
let eth_sync = try!(EthSync::new(Params {
config: sync_cfg,
chain: client,
provider: provider,
snapshot_service: snapshot_service,
network_config: net_cfg,
}));
Ok((eth_sync.clone() as Arc<SyncProvider>, eth_sync.clone() as Arc<ManageNetwork>, eth_sync.clone() as Arc<ChainNotify>))
}

View File

@ -92,6 +92,7 @@ pub struct RunCmd {
pub no_periodic_snapshot: bool,
pub check_seal: bool,
pub download_old_blocks: bool,
pub serve_light: bool,
}
pub fn open_ui(dapps_conf: &dapps::Configuration, signer_conf: &signer::Configuration) -> Result<(), String> {
@ -185,6 +186,11 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<(), String> {
);
info!("Operating mode: {}", Colour::White.bold().paint(format!("{}", mode)));
if cmd.serve_light {
info!("Configured to serve light client peers. Please note this feature is {}.",
Colour::White.bold().paint("experimental".to_string()));
}
// display warning about using experimental journaldb alorithm
if !algorithm.is_stable() {
warn!("Your chosen strategy is {}! You can re-run with --pruning to change.", Colour::Red.bold().paint("unstable"));
@ -204,6 +210,7 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<(), String> {
sync_config.fork_block = spec.fork_block();
sync_config.warp_sync = cmd.warp_sync;
sync_config.download_old_blocks = cmd.download_old_blocks;
sync_config.serve_light = cmd.serve_light;
// prepare account provider
let account_provider = Arc::new(try!(prepare_account_provider(&cmd.dirs, cmd.acc_conf)));
@ -268,7 +275,13 @@ pub fn execute(cmd: RunCmd, logger: Arc<RotatingLogger>) -> Result<(), String> {
// create sync object
let (sync_provider, manage_network, chain_notify) = try!(modules::sync(
&mut hypervisor, sync_config, net_conf.into(), client.clone(), snapshot_service.clone(), &cmd.logger_config,
&mut hypervisor,
sync_config,
net_conf.into(),
client.clone(),
snapshot_service.clone(),
client.clone(),
&cmd.logger_config,
).map_err(|e| format!("Sync error: {}", e)));
service.add_notify(chain_notify.clone());

View File

@ -22,6 +22,7 @@ use hypervisor::{SYNC_MODULE_ID, HYPERVISOR_IPC_URL, ControlService};
use ethcore::client::ChainNotify;
use ethcore::client::remote::RemoteClient;
use ethcore::snapshot::remote::RemoteSnapshotService;
use light::remote::LightProviderClient;
use ethsync::{SyncProvider, EthSync, ManageNetwork, ServiceConfiguration};
use modules::service_urls;
use boot;
@ -48,8 +49,15 @@ pub fn main() {
let remote_client = dependency!(RemoteClient, &service_urls::with_base(&service_config.io_path, service_urls::CLIENT));
let remote_snapshot = dependency!(RemoteSnapshotService, &service_urls::with_base(&service_config.io_path, service_urls::SNAPSHOT));
let remote_provider = dependency!(LightProviderClient, &service_urls::with_base(&service_config.io_path, service_urls::LIGHT_PROVIDER));
let sync = EthSync::new(service_config.sync, remote_client.service().clone(), remote_snapshot.service().clone(), service_config.net).unwrap();
let sync = EthSync::new(Params {
config: service_config.sync,
chain: remote_client.service().clone(),
snapshot_service: remote_snapshot.service().clone(),
provider: remote_provider.service().clone(),
network_config: service_config.net
}).unwrap();
let _ = boot::main_thread();
let service_stop = Arc::new(AtomicBool::new(false));

View File

@ -16,6 +16,10 @@
extern crate ethcore_ipc_codegen;
#[cfg(feature = "ipc")]
fn main() {
ethcore_ipc_codegen::derive_ipc_cond("src/api.rs", cfg!(feature="ipc")).unwrap();
ethcore_ipc_codegen::derive_ipc_cond("src/api.rs", true).unwrap();
}
#[cfg(not(feature = "ipc"))]
fn main() {}

View File

@ -33,6 +33,7 @@ use ipc::{BinaryConvertable, BinaryConvertError, IpcConfig};
use std::str::FromStr;
use parking_lot::RwLock;
use chain::{ETH_PACKET_COUNT, SNAPSHOT_SYNC_PACKET_COUNT};
use light::net::{LightProtocol, Params as LightParams, Capabilities, Handler as LightHandler, EventContext};
pub const WARP_SYNC_PROTOCOL_ID: ProtocolId = *b"par";
@ -47,10 +48,14 @@ pub struct SyncConfig {
pub network_id: u64,
/// Main "eth" subprotocol name.
pub subprotocol_name: [u8; 3],
/// Light "les" subprotocol name.
pub light_subprotocol_name: [u8; 3],
/// Fork block to check
pub fork_block: Option<(BlockNumber, H256)>,
/// Enable snapshot sync
pub warp_sync: bool,
/// Enable light client server.
pub serve_light: bool,
}
impl Default for SyncConfig {
@ -60,8 +65,10 @@ impl Default for SyncConfig {
download_old_blocks: true,
network_id: 1,
subprotocol_name: *b"eth",
light_subprotocol_name: *b"les",
fork_block: None,
warp_sync: false,
serve_light: false,
}
}
}
@ -116,30 +123,74 @@ pub struct PeerInfo {
pub eth_difficulty: Option<U256>,
}
/// EthSync initialization parameters.
#[cfg_attr(feature = "ipc", derive(Binary))]
pub struct Params {
/// Configuration.
pub config: SyncConfig,
/// Blockchain client.
pub chain: Arc<BlockChainClient>,
/// Snapshot service.
pub snapshot_service: Arc<SnapshotService>,
/// Light data provider.
pub provider: Arc<::light::Provider>,
/// Network layer configuration.
pub network_config: NetworkConfiguration,
}
/// Ethereum network protocol handler
pub struct EthSync {
/// Network service
network: NetworkService,
/// Protocol handler
handler: Arc<SyncProtocolHandler>,
/// Main (eth/par) protocol handler
sync_handler: Arc<SyncProtocolHandler>,
/// Light (les) protocol handler
light_proto: Option<Arc<LightProtocol>>,
/// The main subprotocol name
subprotocol_name: [u8; 3],
/// Light subprotocol name.
light_subprotocol_name: [u8; 3],
}
impl EthSync {
/// Creates and register protocol with the network service
pub fn new(config: SyncConfig, chain: Arc<BlockChainClient>, snapshot_service: Arc<SnapshotService>, network_config: NetworkConfiguration) -> Result<Arc<EthSync>, NetworkError> {
let chain_sync = ChainSync::new(config, &*chain);
let service = try!(NetworkService::new(try!(network_config.clone().into_basic())));
let sync = Arc::new(EthSync{
pub fn new(params: Params) -> Result<Arc<EthSync>, NetworkError> {
let pruning_info = params.chain.pruning_info();
let light_proto = match params.config.serve_light {
false => None,
true => Some({
let light_params = LightParams {
network_id: params.config.network_id,
flow_params: Default::default(),
capabilities: Capabilities {
serve_headers: true,
serve_chain_since: Some(pruning_info.earliest_chain),
serve_state_since: Some(pruning_info.earliest_state),
tx_relay: true,
},
};
let mut light_proto = LightProtocol::new(params.provider, light_params);
light_proto.add_handler(Box::new(TxRelay(params.chain.clone())));
Arc::new(light_proto)
})
};
let chain_sync = ChainSync::new(params.config, &*params.chain);
let service = try!(NetworkService::new(try!(params.network_config.clone().into_basic())));
let sync = Arc::new(EthSync {
network: service,
handler: Arc::new(SyncProtocolHandler {
sync_handler: Arc::new(SyncProtocolHandler {
sync: RwLock::new(chain_sync),
chain: chain,
snapshot_service: snapshot_service,
chain: params.chain,
snapshot_service: params.snapshot_service,
overlay: RwLock::new(HashMap::new()),
}),
subprotocol_name: config.subprotocol_name,
light_proto: light_proto,
subprotocol_name: params.config.subprotocol_name,
light_subprotocol_name: params.config.light_subprotocol_name,
});
Ok(sync)
@ -150,14 +201,15 @@ impl EthSync {
impl SyncProvider for EthSync {
/// Get sync status
fn status(&self) -> SyncStatus {
self.handler.sync.write().status()
self.sync_handler.sync.write().status()
}
/// Get sync peers
fn peers(&self) -> Vec<PeerInfo> {
// TODO: [rob] LES peers/peer info
self.network.with_context_eval(self.subprotocol_name, |context| {
let sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay);
self.handler.sync.write().peers(&sync_io)
let sync_io = NetSyncIo::new(context, &*self.sync_handler.chain, &*self.sync_handler.snapshot_service, &self.sync_handler.overlay);
self.sync_handler.sync.write().peers(&sync_io)
}).unwrap_or(Vec::new())
}
@ -166,7 +218,7 @@ impl SyncProvider for EthSync {
}
fn transactions_stats(&self) -> BTreeMap<H256, TransactionStats> {
let sync = self.handler.sync.read();
let sync = self.sync_handler.sync.read();
sync.transactions_stats()
.iter()
.map(|(hash, stats)| (*hash, stats.into()))
@ -228,8 +280,8 @@ impl ChainNotify for EthSync {
_duration: u64)
{
self.network.with_context(self.subprotocol_name, |context| {
let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay);
self.handler.sync.write().chain_new_blocks(
let mut sync_io = NetSyncIo::new(context, &*self.sync_handler.chain, &*self.sync_handler.snapshot_service, &self.sync_handler.overlay);
self.sync_handler.sync.write().chain_new_blocks(
&mut sync_io,
&imported,
&invalid,
@ -245,19 +297,36 @@ impl ChainNotify for EthSync {
Err(err) => warn!("Error starting network: {}", err),
_ => {},
}
self.network.register_protocol(self.handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[62u8, 63u8])
self.network.register_protocol(self.sync_handler.clone(), self.subprotocol_name, ETH_PACKET_COUNT, &[62u8, 63u8])
.unwrap_or_else(|e| warn!("Error registering ethereum protocol: {:?}", e));
// register the warp sync subprotocol
self.network.register_protocol(self.handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8])
self.network.register_protocol(self.sync_handler.clone(), WARP_SYNC_PROTOCOL_ID, SNAPSHOT_SYNC_PACKET_COUNT, &[1u8])
.unwrap_or_else(|e| warn!("Error registering snapshot sync protocol: {:?}", e));
// register the light protocol.
if let Some(light_proto) = self.light_proto.as_ref().map(|x| x.clone()) {
self.network.register_protocol(light_proto, self.light_subprotocol_name, ::light::net::PACKET_COUNT, ::light::net::PROTOCOL_VERSIONS)
.unwrap_or_else(|e| warn!("Error registering light client protocol: {:?}", e));
}
}
fn stop(&self) {
self.handler.snapshot_service.abort_restore();
self.sync_handler.snapshot_service.abort_restore();
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
}
}
/// LES event handler.
/// Simply queues transactions from light client peers.
struct TxRelay(Arc<BlockChainClient>);
impl LightHandler for TxRelay {
fn on_transactions(&self, ctx: &EventContext, relay: &[::ethcore::transaction::SignedTransaction]) {
trace!(target: "les", "Relaying {} transactions from peer {}", relay.len(), ctx.peer());
self.0.queue_transactions(relay.iter().map(|tx| ::rlp::encode(tx).to_vec()).collect())
}
}
impl IpcConfig for ManageNetwork { }
impl IpcConfig for SyncProvider { }
@ -304,9 +373,14 @@ impl ManageNetwork for EthSync {
fn stop_network(&self) {
self.network.with_context(self.subprotocol_name, |context| {
let mut sync_io = NetSyncIo::new(context, &*self.handler.chain, &*self.handler.snapshot_service, &self.handler.overlay);
self.handler.sync.write().abort(&mut sync_io);
let mut sync_io = NetSyncIo::new(context, &*self.sync_handler.chain, &*self.sync_handler.snapshot_service, &self.sync_handler.overlay);
self.sync_handler.sync.write().abort(&mut sync_io);
});
if let Some(light_proto) = self.light_proto.as_ref() {
light_proto.abort();
}
self.stop();
}
@ -452,4 +526,4 @@ pub struct ServiceConfiguration {
pub net: NetworkConfiguration,
/// IPC path.
pub io_path: String,
}
}

View File

@ -67,7 +67,7 @@ mod api {
#[cfg(not(feature = "ipc"))]
mod api;
pub use api::{EthSync, SyncProvider, ManageNetwork, SyncConfig,
pub use api::{EthSync, Params, SyncProvider, ManageNetwork, SyncConfig,
ServiceConfiguration, NetworkConfiguration, PeerInfo, AllowIP, TransactionStats};
pub use chain::{SyncStatus, SyncState};
pub use network::{is_valid_node_url, NonReservedPeerMode, NetworkError};