Merge branch 'master' into renamefield

This commit is contained in:
Gav Wood 2016-06-18 15:05:43 +02:00
commit d416e5d9bc
26 changed files with 376 additions and 154 deletions

20
Cargo.lock generated
View File

@ -286,8 +286,6 @@ dependencies = [
"mime_guess 1.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-dapps 0.3.0 (git+https://github.com/ethcore/parity-dapps-rs.git)",
"parity-dapps-builtins 0.5.1 (git+https://github.com/ethcore/parity-dapps-builtins-rs.git)",
"parity-dapps-dao 0.4.0 (git+https://github.com/ethcore/parity-dapps-dao-rs.git)",
"parity-dapps-makerotc 0.3.0 (git+https://github.com/ethcore/parity-dapps-makerotc-rs.git)",
"parity-dapps-status 0.5.0 (git+https://github.com/ethcore/parity-dapps-status-rs.git)",
"parity-dapps-wallet 0.6.1 (git+https://github.com/ethcore/parity-dapps-wallet-rs.git)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
@ -917,26 +915,10 @@ dependencies = [
"parity-dapps 0.3.0 (git+https://github.com/ethcore/parity-dapps-rs.git)",
]
[[package]]
name = "parity-dapps-dao"
version = "0.4.0"
source = "git+https://github.com/ethcore/parity-dapps-dao-rs.git#dd6b9ca7c18fbfa714183a4f570bd75b8391c13d"
dependencies = [
"parity-dapps 0.3.0 (git+https://github.com/ethcore/parity-dapps-rs.git)",
]
[[package]]
name = "parity-dapps-makerotc"
version = "0.3.0"
source = "git+https://github.com/ethcore/parity-dapps-makerotc-rs.git#33568ac7209aa765c498bb2322e848f552656303"
dependencies = [
"parity-dapps 0.3.0 (git+https://github.com/ethcore/parity-dapps-rs.git)",
]
[[package]]
name = "parity-dapps-status"
version = "0.5.0"
source = "git+https://github.com/ethcore/parity-dapps-status-rs.git#53e159f52013be5d2e8ba7eca35f605ad6e3bfa9"
source = "git+https://github.com/ethcore/parity-dapps-status-rs.git#0cdd3512004e403aff7da3b8c16ba0bf5d6c911c"
dependencies = [
"parity-dapps 0.3.0 (git+https://github.com/ethcore/parity-dapps-rs.git)",
]

View File

@ -36,6 +36,6 @@ syntex = "*"
[features]
default = ["serde_codegen", "extra-dapps"]
extra-dapps = ["parity-dapps-wallet", "parity-dapps-dao", "parity-dapps-makerotc"]
extra-dapps = ["parity-dapps-wallet"]
nightly = ["serde_macros"]
dev = ["clippy", "ethcore-rpc/dev", "ethcore-util/dev"]

View File

@ -41,6 +41,10 @@ pub enum SyncMessage {
NewChainHead,
/// A block is ready
BlockVerified,
/// Start network command.
StartNetwork,
/// Stop network command.
StopNetwork,
}
/// IO Message type used for Network service
@ -48,17 +52,20 @@ pub type NetSyncMessage = NetworkIoMessage<SyncMessage>;
/// Client service setup. Creates and registers client and network services with the IO subsystem.
pub struct ClientService {
net_service: NetworkService<SyncMessage>,
net_service: Arc<NetworkService<SyncMessage>>,
client: Arc<Client>,
panic_handler: Arc<PanicHandler>
}
impl ClientService {
/// Start the service in a separate thread.
pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path, miner: Arc<Miner>) -> Result<ClientService, Error> {
pub fn start(config: ClientConfig, spec: Spec, net_config: NetworkConfiguration, db_path: &Path, miner: Arc<Miner>, enable_network: bool) -> Result<ClientService, Error> {
let panic_handler = PanicHandler::new_in_arc();
let mut net_service = try!(NetworkService::start(net_config));
let net_service = try!(NetworkService::new(net_config));
panic_handler.forward_from(&net_service);
if enable_network {
try!(net_service.start());
}
info!("Starting {}", net_service.host_info());
info!("Configured for {} using {:?} engine", spec.name, spec.engine.name());
@ -70,7 +77,7 @@ impl ClientService {
try!(net_service.io().register_handler(client_io));
Ok(ClientService {
net_service: net_service,
net_service: Arc::new(net_service),
client: client,
panic_handler: panic_handler,
})
@ -82,8 +89,8 @@ impl ClientService {
}
/// Get general IO interface
pub fn io(&mut self) -> &mut IoService<NetSyncMessage> {
self.net_service.io()
pub fn register_io_handler(&self, handler: Arc<IoHandler<NetSyncMessage> + Send>) -> Result<(), IoError> {
self.net_service.io().register_handler(handler)
}
/// Get client interface
@ -92,8 +99,8 @@ impl ClientService {
}
/// Get network service component
pub fn network(&mut self) -> &mut NetworkService<SyncMessage> {
&mut self.net_service
pub fn network(&mut self) -> Arc<NetworkService<SyncMessage>> {
self.net_service.clone()
}
}
@ -149,7 +156,7 @@ mod tests {
fn it_can_be_started() {
let spec = get_test_spec();
let temp_path = RandomTempPath::new();
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default()));
let service = ClientService::start(ClientConfig::default(), spec, NetworkConfiguration::new_local(), &temp_path.as_path(), Arc::new(Miner::default()), false);
assert!(service.is_ok());
}
}

View File

@ -53,6 +53,7 @@ Account Options:
--no-import-keys Do not import keys from legacy clients.
Networking Options:
--no-network Disable p2p networking.
--port PORT Override the port on which the node should listen
[default: 30303].
--peers NUM Try to maintain that many peers [default: 25].
@ -268,6 +269,7 @@ pub struct Args {
pub flag_format: Option<String>,
pub flag_jitvm: bool,
pub flag_no_color: bool,
pub flag_no_network: bool,
// legacy...
pub flag_geth: bool,
pub flag_nodekey: Option<String>,

View File

@ -16,10 +16,10 @@
use std::sync::Arc;
use ethcore::client::Client;
use ethcore::service::NetSyncMessage;
use ethcore::service::{NetSyncMessage, SyncMessage};
use ethsync::EthSync;
use util::keys::store::AccountService;
use util::{TimerToken, IoHandler, IoContext};
use util::{TimerToken, IoHandler, IoContext, NetworkService, NetworkIoMessage};
use informant::Informant;
@ -33,6 +33,7 @@ pub struct ClientIoHandler {
pub sync: Arc<EthSync>,
pub accounts: Arc<AccountService>,
pub info: Informant,
pub network: Arc<NetworkService<SyncMessage>>,
}
impl IoHandler<NetSyncMessage> for ClientIoHandler {
@ -48,6 +49,21 @@ impl IoHandler<NetSyncMessage> for ClientIoHandler {
_ => {}
}
}
fn message(&self, _io: &IoContext<NetSyncMessage>, message: &NetSyncMessage) {
match *message {
NetworkIoMessage::User(SyncMessage::StartNetwork) => {
info!("Starting network");
self.network.start().unwrap_or_else(|e| warn!("Error starting network: {:?}", e));
EthSync::register(&*self.network, self.sync.clone()).unwrap_or_else(|e| warn!("Error registering eth protocol handler: {}", e));
},
NetworkIoMessage::User(SyncMessage::StopNetwork) => {
info!("Stopping network");
self.network.stop().unwrap_or_else(|e| warn!("Error stopping network: {:?}", e));
},
_ => {/* Ignore other messages */},
}
}
}

View File

@ -80,7 +80,7 @@ use std::thread::sleep;
use std::time::Duration;
use rustc_serialize::hex::FromHex;
use ctrlc::CtrlC;
use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes};
use util::{H256, ToPretty, NetworkConfiguration, PayloadInfo, Bytes, UtilError};
use util::panics::{MayPanic, ForwardPanic, PanicHandler};
use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path};
use ethcore::error::{Error, ImportError};
@ -199,7 +199,7 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
// Build client
let mut service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path()), miner.clone()
client_config, spec, net_settings, Path::new(&conf.path()), miner.clone(), !conf.args.flag_no_network
).unwrap_or_else(|e| die_with_error("Client", e));
panic_handler.forward_from(&service);
@ -209,7 +209,8 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
let network_settings = Arc::new(conf.network_settings());
// Sync
let sync = EthSync::register(service.network(), sync_config, client.clone());
let sync = EthSync::new(sync_config, client.clone());
EthSync::register(&*service.network(), sync.clone()).unwrap_or_else(|e| die_with_error("Error registering eth protocol handler", UtilError::from(e).into()));
let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies {
signer_port: conf.signer_port(),
@ -270,8 +271,9 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
info: Informant::new(conf.have_color()),
sync: sync.clone(),
accounts: account_service.clone(),
network: service.network(),
});
service.io().register_handler(io_handler).expect("Error registering IO handler");
service.register_io_handler(io_handler).expect("Error registering IO handler");
if conf.args.cmd_ui {
url::open("http://localhost:8080/")
@ -314,7 +316,7 @@ fn execute_export(conf: Configuration) {
// Build client
let service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()),
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), false
).unwrap_or_else(|e| die_with_error("Client", e));
panic_handler.forward_from(&service);
@ -385,7 +387,7 @@ fn execute_import(conf: Configuration) {
// Build client
let service = ClientService::start(
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()),
client_config, spec, net_settings, Path::new(&conf.path()), Arc::new(Miner::default()), false
).unwrap_or_else(|e| die_with_error("Client", e));
panic_handler.forward_from(&service);

View File

@ -122,10 +122,10 @@ fn list_apis(apis: ApiSet) -> Vec<Api> {
match apis {
ApiSet::List(apis) => apis,
ApiSet::UnsafeContext => {
vec![Api::Web3, Api::Net, Api::Eth, Api::Personal, Api::Ethcore, Api::EthcoreSet, Api::Traces, Api::Rpc]
vec![Api::Web3, Api::Net, Api::Eth, Api::Personal, Api::Ethcore, Api::Traces, Api::Rpc]
},
_ => {
vec![Api::Web3, Api::Net, Api::Eth, Api::Personal, Api::Signer, Api::Ethcore, Api::EthcoreSet, Api::Traces, Api::Rpc]
vec![Api::Web3, Api::Net, Api::Eth, Api::Personal, Api::Signer, Api::Ethcore, Api::Traces, Api::Rpc]
},
}
}
@ -147,7 +147,7 @@ pub fn setup_rpc<T: Extendable>(server: T, deps: Arc<Dependencies>, apis: ApiSet
server.add_delegate(EthFilterClient::new(&deps.client, &deps.miner).to_delegate());
if deps.signer_port.is_some() {
server.add_delegate(EthSigningQueueClient::new(&deps.signer_queue).to_delegate());
server.add_delegate(EthSigningQueueClient::new(&deps.signer_queue, &deps.miner).to_delegate());
} else {
server.add_delegate(EthSigningUnsafeClient::new(&deps.client, &deps.secret_store, &deps.miner).to_delegate());
}

View File

@ -421,9 +421,15 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM> where
fn transaction_receipt(&self, params: Params) -> Result<Value, Error> {
from_params::<(H256,)>(params)
.and_then(|(hash,)| {
let client = take_weak!(self.client);
let receipt = client.transaction_receipt(TransactionID::Hash(hash));
to_value(&receipt.map(Receipt::from))
let miner = take_weak!(self.miner);
match miner.pending_receipts().get(&hash) {
Some(receipt) => to_value(&Receipt::from(receipt.clone())),
None => {
let client = take_weak!(self.client);
let receipt = client.transaction_receipt(TransactionID::Hash(hash));
to_value(&receipt.map(Receipt::from))
}
}
})
}

View File

@ -24,25 +24,40 @@ use util::numbers::*;
use util::keys::store::AccountProvider;
use v1::helpers::{SigningQueue, ConfirmationsQueue};
use v1::traits::EthSigning;
use v1::types::TransactionRequest;
use v1::types::{TransactionRequest, Bytes};
use v1::impls::{sign_and_dispatch, error_codes};
/// Implementation of functions that require signing when no trusted signer is used.
pub struct EthSigningQueueClient {
pub struct EthSigningQueueClient<M: MinerService> {
queue: Weak<ConfirmationsQueue>,
miner: Weak<M>,
}
impl EthSigningQueueClient {
impl<M: MinerService> EthSigningQueueClient<M> {
/// Creates a new signing queue client given shared signing queue.
pub fn new(queue: &Arc<ConfirmationsQueue>) -> Self {
pub fn new(queue: &Arc<ConfirmationsQueue>, miner: &Arc<M>) -> Self {
EthSigningQueueClient {
queue: Arc::downgrade(queue),
miner: Arc::downgrade(miner),
}
}
fn fill_optional_fields(&self, miner: Arc<M>, mut request: TransactionRequest) -> TransactionRequest {
if let None = request.gas {
request.gas = Some(miner.sensible_gas_limit());
}
if let None = request.gas_price {
request.gas_price = Some(miner.sensible_gas_price());
}
if let None = request.data {
request.data = Some(Bytes::new(Vec::new()));
}
request
}
}
impl EthSigning for EthSigningQueueClient {
impl<M: MinerService + 'static> EthSigning for EthSigningQueueClient<M> {
fn sign(&self, _params: Params) -> Result<Value, Error> {
warn!("Invoking eth_sign is not yet supported with signer enabled.");
@ -54,6 +69,8 @@ impl EthSigning for EthSigningQueueClient {
from_params::<(TransactionRequest, )>(params)
.and_then(|(request, )| {
let queue = take_weak!(self.queue);
let miner = take_weak!(self.miner);
let request = self.fill_optional_fields(miner, request);
let id = queue.add_request(request);
let result = id.wait_with_timeout();
result.unwrap_or_else(|| to_value(&H256::new()))

View File

@ -47,4 +47,14 @@ impl<S> Net for NetClient<S> where S: SyncProvider + 'static {
// right now (11 march 2016), we are always listening for incoming connections
Ok(Value::Bool(true))
}
fn start_network(&self, _: Params) -> Result<Value, Error> {
take_weak!(self.sync).start_network();
Ok(Value::Bool(true))
}
fn stop_network(&self, _: Params) -> Result<Value, Error> {
take_weak!(self.sync).stop_network();
Ok(Value::Bool(true))
}
}

View File

@ -59,5 +59,11 @@ impl SyncProvider for TestSyncProvider {
fn status(&self) -> SyncStatus {
self.status.read().unwrap().clone()
}
fn start_network(&self) {
}
fn stop_network(&self) {
}
}

View File

@ -19,21 +19,25 @@ use jsonrpc_core::IoHandler;
use v1::impls::EthSigningQueueClient;
use v1::traits::EthSigning;
use v1::helpers::{ConfirmationsQueue, SigningQueue};
use v1::tests::helpers::TestMinerService;
use util::keys::TestAccount;
struct EthSigningTester {
pub queue: Arc<ConfirmationsQueue>,
pub miner: Arc<TestMinerService>,
pub io: IoHandler,
}
impl Default for EthSigningTester {
fn default() -> Self {
let queue = Arc::new(ConfirmationsQueue::default());
let miner = Arc::new(TestMinerService::default());
let io = IoHandler::new();
io.add_delegate(EthSigningQueueClient::new(&queue).to_delegate());
io.add_delegate(EthSigningQueueClient::new(&queue, &miner).to_delegate());
EthSigningTester {
queue: queue,
miner: miner,
io: io,
}
}

View File

@ -30,6 +30,12 @@ pub trait Net: Sized + Send + Sync + 'static {
/// Otherwise false.
fn is_listening(&self, _: Params) -> Result<Value, Error>;
/// Start the network.
fn start_network(&self, _: Params) -> Result<Value, Error>;
/// Stop the network.
fn stop_network(&self, _: Params) -> Result<Value, Error>;
/// Should be used to convert object to io delegate.
fn to_delegate(self) -> IoDelegate<Self> {
let mut delegate = IoDelegate::new(Arc::new(self));

View File

@ -17,23 +17,23 @@
use util::numbers::U256;
use util::hash::{Address, H256};
use v1::types::Log;
use ethcore::receipt::LocalizedReceipt;
use ethcore::receipt::{Receipt as EthReceipt, LocalizedReceipt};
/// Receipt
#[derive(Debug, Serialize)]
pub struct Receipt {
/// Transaction Hash
#[serde(rename="transactionHash")]
pub transaction_hash: H256,
pub transaction_hash: Option<H256>,
/// Transaction index
#[serde(rename="transactionIndex")]
pub transaction_index: U256,
pub transaction_index: Option<U256>,
/// Block hash
#[serde(rename="blockHash")]
pub block_hash: H256,
pub block_hash: Option<H256>,
/// Block number
#[serde(rename="blockNumber")]
pub block_number: U256,
pub block_number: Option<U256>,
/// Cumulative gas used
#[serde(rename="cumulativeGasUsed")]
pub cumulative_gas_used: U256,
@ -50,10 +50,10 @@ pub struct Receipt {
impl From<LocalizedReceipt> for Receipt {
fn from(r: LocalizedReceipt) -> Self {
Receipt {
transaction_hash: r.transaction_hash,
transaction_index: U256::from(r.transaction_index),
block_hash: r.block_hash,
block_number: U256::from(r.block_number),
transaction_hash: Some(r.transaction_hash),
transaction_index: Some(U256::from(r.transaction_index)),
block_hash: Some(r.block_hash),
block_number: Some(U256::from(r.block_number)),
cumulative_gas_used: r.cumulative_gas_used,
gas_used: r.gas_used,
contract_address: r.contract_address,
@ -62,6 +62,21 @@ impl From<LocalizedReceipt> for Receipt {
}
}
impl From<EthReceipt> for Receipt {
fn from(r: EthReceipt) -> Self {
Receipt {
transaction_hash: None,
transaction_index: None,
block_hash: None,
block_number: None,
cumulative_gas_used: r.gas_used,
gas_used: r.gas_used,
contract_address: None,
logs: r.logs.into_iter().map(From::from).collect(),
}
}
}
#[cfg(test)]
mod tests {
use serde_json;
@ -74,10 +89,10 @@ mod tests {
let s = r#"{"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","cumulativeGasUsed":"0x20","gasUsed":"0x10","contractAddress":null,"logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"data":"0x","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x04510c","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x00","logIndex":"0x01","type":"mined"}]}"#;
let receipt = Receipt {
transaction_hash: H256::zero(),
transaction_index: U256::zero(),
block_hash: H256::from_str("ed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5").unwrap(),
block_number: U256::from(0x4510c),
transaction_hash: Some(H256::zero()),
transaction_index: Some(U256::zero()),
block_hash: Some(H256::from_str("ed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5").unwrap()),
block_number: Some(U256::from(0x4510c)),
cumulative_gas_used: U256::from(0x20),
gas_used: U256::from(0x10),
contract_address: None,

View File

@ -356,6 +356,10 @@ impl ChainSync {
};
trace!(target: "sync", "New peer {} (protocol: {}, network: {:?}, difficulty: {:?}, latest:{}, genesis:{})", peer_id, peer.protocol_version, peer.network_id, peer.difficulty, peer.latest_hash, peer.genesis);
if io.is_expired() {
trace!("Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id));
return Ok(());
}
if self.peers.contains_key(&peer_id) {
warn!("Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id));
@ -454,12 +458,21 @@ impl ChainSync {
// Disable the peer for this syncing round if it gives invalid chain
if !valid_response {
trace!(target: "sync", "{} Deactivated for invalid headers response", peer_id);
<<<<<<< HEAD
self.deactivate_peer(io, peer_id);
}
if headers.is_empty() && self.state == SyncState::ChainHead {
// Peer does not have any new subchain heads, deactivate it nd try with another
trace!(target: "sync", "{} Deactivated for no data", peer_id);
self.deactivate_peer(io, peer_id);
=======
self.deactivate_peer(io, peer_id);
}
if headers.is_empty() {
// Peer does not have any new subchain heads, deactivate it nd try with another
trace!(target: "sync", "{} Deactivated for no data", peer_id);
self.deactivate_peer(io, peer_id);
>>>>>>> master
}
match self.state {
SyncState::ChainHead => {

View File

@ -41,6 +41,8 @@ pub trait SyncIo {
fn is_chain_queue_empty(&self) -> bool {
self.chain().queue_info().is_empty()
}
/// Check if the session is expired
fn is_expired(&self) -> bool;
}
/// Wraps `NetworkContext` and the blockchain client
@ -83,6 +85,10 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
fn peer_info(&self, peer_id: PeerId) -> String {
self.network.peer_info(peer_id)
}
fn is_expired(&self) -> bool {
self.network.is_expired()
}
}

View File

@ -41,11 +41,13 @@
//! use ethcore::miner::Miner;
//!
//! fn main() {
//! let mut service = NetworkService::start(NetworkConfiguration::new()).unwrap();
//! let mut service = NetworkService::new(NetworkConfiguration::new()).unwrap();
//! service.start().unwrap();
//! let dir = env::temp_dir();
//! let client = Client::new(ClientConfig::default(), ethereum::new_frontier(), &dir, Arc::new(Miner::default()), service.io().channel()).unwrap();
//! let miner = Miner::new(false, ethereum::new_frontier());
//! EthSync::register(&mut service, SyncConfig::default(), client);
//! let sync = EthSync::new(SyncConfig::default(), client);
//! EthSync::register(&mut service, sync);
//! }
//! ```
@ -66,8 +68,10 @@ use util::network::{NetworkProtocolHandler, NetworkService, NetworkContext, Peer
use util::TimerToken;
use util::{U256, ONE_U256};
use ethcore::client::Client;
use ethcore::service::SyncMessage;
use ethcore::service::{SyncMessage, NetSyncMessage};
use io::NetSyncIo;
use util::io::IoChannel;
use util::{NetworkIoMessage, NetworkError};
use chain::ChainSync;
mod chain;
@ -98,6 +102,10 @@ impl Default for SyncConfig {
pub trait SyncProvider: Send + Sync {
/// Get sync status
fn status(&self) -> SyncStatus;
/// Start the network
fn start_network(&self);
/// Stop the network
fn stop_network(&self);
}
/// Ethereum network protocol handler
@ -105,23 +113,30 @@ pub struct EthSync {
/// Shared blockchain client. TODO: this should evetually become an IPC endpoint
chain: Arc<Client>,
/// Sync strategy
sync: RwLock<ChainSync>
sync: RwLock<ChainSync>,
/// IO communication chnnel.
io_channel: RwLock<IoChannel<NetSyncMessage>>,
}
pub use self::chain::{SyncStatus, SyncState};
impl EthSync {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService<SyncMessage>, config: SyncConfig, chain: Arc<Client>) -> Arc<EthSync> {
pub fn new(config: SyncConfig, chain: Arc<Client>) -> Arc<EthSync> {
let sync = ChainSync::new(config, chain.deref());
let sync = Arc::new(EthSync {
chain: chain,
sync: RwLock::new(sync),
io_channel: RwLock::new(IoChannel::disconnected()),
});
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
sync
}
/// Register protocol with the network service
pub fn register(service: &NetworkService<SyncMessage>, sync: Arc<EthSync>) -> Result<(), NetworkError> {
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8])
}
/// Stop sync
pub fn stop(&mut self, io: &mut NetworkContext<SyncMessage>) {
self.sync.write().unwrap().abort(&mut NetSyncIo::new(io, self.chain.deref()));
@ -138,11 +153,20 @@ impl SyncProvider for EthSync {
fn status(&self) -> SyncStatus {
self.sync.read().unwrap().status()
}
fn start_network(&self) {
self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StartNetwork)).expect("Error sending IO notification");
}
fn stop_network(&self) {
self.io_channel.read().unwrap().send(NetworkIoMessage::User(SyncMessage::StopNetwork)).expect("Error sending IO notification");
}
}
impl NetworkProtocolHandler<SyncMessage> for EthSync {
fn initialize(&self, io: &NetworkContext<SyncMessage>) {
io.register_timer(0, 1000).expect("Error registering sync timer");
*self.io_channel.write().unwrap() = io.io_channel();
}
fn read(&self, io: &NetworkContext<SyncMessage>, peer: &PeerId, packet_id: u8, data: &[u8]) {

View File

@ -192,3 +192,13 @@ fn restart_on_broken_chain() {
assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5);
}
#[test]
fn high_td_attach() {
let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer_mut(1).chain.corrupt_block_parent(6);
net.sync_steps(20);
assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5);
}

View File

@ -43,6 +43,10 @@ impl<'p> SyncIo for TestIo<'p> {
fn disconnect_peer(&mut self, _peer_id: PeerId) {
}
fn is_expired(&self) -> bool {
false
}
fn respond(&mut self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
self.queue.push_back(TestPacket {
data: data,

View File

@ -142,7 +142,7 @@ mod tests {
#[test]
fn test_service_register_handler () {
let mut service = IoService::<MyMessage>::start().expect("Error creating network service");
let service = IoService::<MyMessage>::start().expect("Error creating network service");
service.register_handler(Arc::new(MyHandler)).unwrap();
}

View File

@ -18,9 +18,10 @@ use std::sync::*;
use std::thread::{self, JoinHandle};
use std::collections::HashMap;
use mio::*;
use crossbeam::sync::chase_lev;
use slab::Slab;
use error::*;
use io::{IoError, IoHandler};
use crossbeam::sync::chase_lev;
use io::worker::{Worker, Work, WorkType};
use panics::*;
@ -33,6 +34,7 @@ pub type HandlerId = usize;
/// Maximum number of tokens a handler can use
pub const TOKENS_PER_HANDLER: usize = 16384;
const MAX_HANDLERS: usize = 8;
/// Messages used to communicate with the event loop from other threads.
#[derive(Clone)]
@ -43,6 +45,9 @@ pub enum IoMessage<Message> where Message: Send + Clone + Sized {
AddHandler {
handler: Arc<IoHandler<Message>+Send>,
},
RemoveHandler {
handler_id: HandlerId,
},
AddTimer {
handler_id: HandlerId,
token: TimerToken,
@ -138,6 +143,15 @@ impl<Message> IoContext<Message> where Message: Send + Clone + 'static {
pub fn channel(&self) -> IoChannel<Message> {
self.channel.clone()
}
/// Unregister current IO handler.
pub fn unregister_handler(&self) -> Result<(), IoError> {
try!(self.channel.send_io(IoMessage::RemoveHandler {
handler_id: self.handler,
}));
Ok(())
}
}
#[derive(Clone)]
@ -149,7 +163,7 @@ struct UserTimer {
/// Root IO handler. Manages user handlers, messages and IO timers.
pub struct IoManager<Message> where Message: Send + Sync {
timers: Arc<RwLock<HashMap<HandlerId, UserTimer>>>,
handlers: Vec<Arc<IoHandler<Message>>>,
handlers: Slab<Arc<IoHandler<Message>>, HandlerId>,
workers: Vec<Worker>,
worker_channel: chase_lev::Worker<Work<Message>>,
work_ready: Arc<Condvar>,
@ -175,7 +189,7 @@ impl<Message> IoManager<Message> where Message: Send + Sync + Clone + 'static {
let mut io = IoManager {
timers: Arc::new(RwLock::new(HashMap::new())),
handlers: Vec::new(),
handlers: Slab::new(MAX_HANDLERS),
worker_channel: worker,
workers: workers,
work_ready: work_ready,
@ -192,36 +206,31 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
fn ready(&mut self, _event_loop: &mut EventLoop<Self>, token: Token, events: EventSet) {
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if handler_index >= self.handlers.len() {
panic!("Unexpected stream token: {}", token.as_usize());
}
let handler = self.handlers[handler_index].clone();
if events.is_hup() {
self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index });
}
else {
if events.is_readable() {
self.worker_channel.push(Work { work_type: WorkType::Readable, token: token_id, handler: handler.clone(), handler_id: handler_index });
if let Some(handler) = self.handlers.get(handler_index) {
if events.is_hup() {
self.worker_channel.push(Work { work_type: WorkType::Hup, token: token_id, handler: handler.clone(), handler_id: handler_index });
}
if events.is_writable() {
self.worker_channel.push(Work { work_type: WorkType::Writable, token: token_id, handler: handler.clone(), handler_id: handler_index });
else {
if events.is_readable() {
self.worker_channel.push(Work { work_type: WorkType::Readable, token: token_id, handler: handler.clone(), handler_id: handler_index });
}
if events.is_writable() {
self.worker_channel.push(Work { work_type: WorkType::Writable, token: token_id, handler: handler.clone(), handler_id: handler_index });
}
}
self.work_ready.notify_all();
}
self.work_ready.notify_all();
}
fn timeout(&mut self, event_loop: &mut EventLoop<Self>, token: Token) {
let handler_index = token.as_usize() / TOKENS_PER_HANDLER;
let token_id = token.as_usize() % TOKENS_PER_HANDLER;
if handler_index >= self.handlers.len() {
panic!("Unexpected timer token: {}", token.as_usize());
}
if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) {
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
let handler = self.handlers[handler_index].clone();
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler, handler_id: handler_index });
self.work_ready.notify_all();
if let Some(handler) = self.handlers.get(handler_index) {
if let Some(timer) = self.timers.read().unwrap().get(&token.as_usize()) {
event_loop.timeout_ms(token, timer.delay).expect("Error re-registering user timer");
self.worker_channel.push(Work { work_type: WorkType::Timeout, token: token_id, handler: handler.clone(), handler_id: handler_index });
self.work_ready.notify_all();
}
}
}
@ -232,12 +241,13 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
event_loop.shutdown();
},
IoMessage::AddHandler { handler } => {
let handler_id = {
self.handlers.push(handler.clone());
self.handlers.len() - 1
};
let handler_id = self.handlers.insert(handler.clone()).unwrap_or_else(|_| panic!("Too many handlers registered"));
handler.initialize(&IoContext::new(IoChannel::new(event_loop.channel()), handler_id));
},
IoMessage::RemoveHandler { handler_id } => {
// TODO: flush event loop
self.handlers.remove(handler_id);
},
IoMessage::AddTimer { handler_id, token, delay } => {
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
let timeout = event_loop.timeout_ms(Token(timer_id), delay).expect("Error registering user timer");
@ -250,26 +260,32 @@ impl<Message> Handler for IoManager<Message> where Message: Send + Clone + Sync
}
},
IoMessage::RegisterStream { handler_id, token } => {
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
if let Some(handler) = self.handlers.get(handler_id) {
handler.register_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
}
},
IoMessage::DeregisterStream { handler_id, token } => {
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
handler.deregister_stream(token, event_loop);
// unregister a timer associated with the token (if any)
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) {
event_loop.clear_timeout(timer.timeout);
if let Some(handler) = self.handlers.get(handler_id) {
handler.deregister_stream(token, event_loop);
// unregister a timer associated with the token (if any)
let timer_id = token + handler_id * TOKENS_PER_HANDLER;
if let Some(timer) = self.timers.write().unwrap().remove(&timer_id) {
event_loop.clear_timeout(timer.timeout);
}
}
},
IoMessage::UpdateStreamRegistration { handler_id, token } => {
let handler = self.handlers.get(handler_id).expect("Unknown handler id").clone();
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
if let Some(handler) = self.handlers.get(handler_id) {
handler.update_stream(token, Token(token + handler_id * TOKENS_PER_HANDLER), event_loop);
}
},
IoMessage::UserMessage(data) => {
for n in 0 .. self.handlers.len() {
let handler = self.handlers[n].clone();
self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: n });
//TODO: better way to iterate the slab
for id in 0 .. MAX_HANDLERS {
if let Some(h) = self.handlers.get(id) {
let handler = h.clone();
self.worker_channel.push(Work { work_type: WorkType::Message(data.clone()), token: 0, handler: handler, handler_id: id });
}
}
self.work_ready.notify_all();
}
@ -351,8 +367,8 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
})
}
/// Regiter a IO hadnler with the event loop.
pub fn register_handler(&mut self, handler: Arc<IoHandler<Message>+Send>) -> Result<(), IoError> {
/// Regiter an IO handler with the event loop.
pub fn register_handler(&self, handler: Arc<IoHandler<Message>+Send>) -> Result<(), IoError> {
try!(self.host_channel.send(IoMessage::AddHandler {
handler: handler,
}));
@ -360,13 +376,13 @@ impl<Message> IoService<Message> where Message: Send + Sync + Clone + 'static {
}
/// Send a message over the network. Normaly `HostIo::send` should be used. This can be used from non-io threads.
pub fn send_message(&mut self, message: Message) -> Result<(), IoError> {
pub fn send_message(&self, message: Message) -> Result<(), IoError> {
try!(self.host_channel.send(IoMessage::UserMessage(message)));
Ok(())
}
/// Create a new message channel
pub fn channel(&mut self) -> IoChannel<Message> {
pub fn channel(&self) -> IoChannel<Message> {
IoChannel { channel: Some(self.host_channel.clone()) }
}
}

View File

@ -18,7 +18,7 @@ use std::net::{SocketAddr};
use std::collections::{HashMap};
use std::str::{FromStr};
use std::sync::*;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::ops::*;
use std::cmp::min;
use std::path::{Path, PathBuf};
@ -50,7 +50,7 @@ const MAX_HANDSHAKES: usize = 80;
const MAX_HANDSHAKES_PER_ROUND: usize = 32;
const MAINTENANCE_TIMEOUT: u64 = 1000;
#[derive(Debug)]
#[derive(Debug, Clone)]
/// Network service configuration
pub struct NetworkConfiguration {
/// Directory path to store network configuration. None means nothing will be saved
@ -234,6 +234,11 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
self.io.message(NetworkIoMessage::User(msg));
}
/// Send an IO message
pub fn io_channel(&self) -> IoChannel<NetworkIoMessage<Message>> {
self.io.channel()
}
/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
pub fn disable_peer(&self, peer: PeerId) {
//TODO: remove capability, disconnect if no capabilities left
@ -245,6 +250,11 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone
self.io.message(NetworkIoMessage::Disconnect(peer));
}
/// Sheck if the session is till active.
pub fn is_expired(&self) -> bool {
self.session.as_ref().map(|s| s.lock().unwrap().expired()).unwrap_or(false)
}
/// Register a new IO timer. 'IoHandler::timeout' will be called with the token.
pub fn register_timer(&self, token: TimerToken, ms: u64) -> Result<(), UtilError> {
self.io.message(NetworkIoMessage::AddTimer {
@ -324,11 +334,12 @@ pub struct Host<Message> where Message: Send + Sync + Clone {
stats: Arc<NetworkStats>,
pinned_nodes: Vec<NodeId>,
num_sessions: AtomicUsize,
stopping: AtomicBool,
}
impl<Message> Host<Message> where Message: Send + Sync + Clone {
/// Create a new instance
pub fn new(config: NetworkConfiguration) -> Result<Host<Message>, UtilError> {
pub fn new(config: NetworkConfiguration, stats: Arc<NetworkStats>) -> Result<Host<Message>, UtilError> {
let mut listen_address = match config.listen_address {
None => SocketAddr::from_str("0.0.0.0:30304").unwrap(),
Some(addr) => addr,
@ -372,9 +383,10 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
handlers: RwLock::new(HashMap::new()),
timers: RwLock::new(HashMap::new()),
timer_counter: RwLock::new(USER_TIMER),
stats: Arc::new(NetworkStats::default()),
stats: stats,
pinned_nodes: Vec::new(),
num_sessions: AtomicUsize::new(0),
stopping: AtomicBool::new(false),
};
let boot_nodes = host.info.read().unwrap().config.boot_nodes.clone();
@ -384,10 +396,6 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
Ok(host)
}
pub fn stats(&self) -> Arc<NetworkStats> {
self.stats.clone()
}
pub fn add_node(&mut self, id: &str) {
match Node::from_str(id) {
Err(e) => { debug!(target: "network", "Could not add node {}: {:?}", id, e); },
@ -402,8 +410,8 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
}
pub fn client_version(&self) -> String {
self.info.read().unwrap().client_version.clone()
pub fn client_version() -> String {
version()
}
pub fn external_url(&self) -> Option<String> {
@ -416,6 +424,22 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
r
}
pub fn stop(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
self.stopping.store(true, AtomicOrdering::Release);
let mut to_kill = Vec::new();
for e in self.sessions.write().unwrap().iter_mut() {
let mut s = e.lock().unwrap();
s.disconnect(io, DisconnectReason::ClientQuit);
to_kill.push(s.token());
}
for p in to_kill {
trace!(target: "network", "Disconnecting on shutdown: {}", p);
self.kill_connection(p, io, true);
}
try!(io.unregister_handler());
Ok(())
}
fn init_public_interface(&self, io: &IoContext<NetworkIoMessage<Message>>) -> Result<(), UtilError> {
io.clear_timer(INIT_PUBLIC).unwrap();
if self.info.read().unwrap().public_endpoint.is_some() {
@ -787,6 +811,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}
fn stream_readable(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) {
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io),
DISCOVERY => {
@ -802,6 +829,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}
fn stream_writable(&self, io: &IoContext<NetworkIoMessage<Message>>, stream: StreamToken) {
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
match stream {
FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io),
DISCOVERY => {
@ -813,6 +843,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}
fn timeout(&self, io: &IoContext<NetworkIoMessage<Message>>, token: TimerToken) {
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
match token {
IDLE => self.maintain_network(io),
INIT_PUBLIC => self.init_public_interface(io).unwrap_or_else(|e|
@ -835,8 +868,8 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
},
_ => match self.timers.read().unwrap().get(&token).cloned() {
Some(timer) => match self.handlers.read().unwrap().get(timer.protocol).cloned() {
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); }
None => { warn!(target: "network", "No handler found for protocol: {:?}", timer.protocol) },
Some(h) => { h.timeout(&NetworkContext::new(io, timer.protocol, None, self.sessions.clone()), timer.token); }
},
None => { warn!("Unknown timer token: {}", token); } // timer is not registerd through us
}
@ -844,6 +877,9 @@ impl<Message> IoHandler<NetworkIoMessage<Message>> for Host<Message> where Messa
}
fn message(&self, io: &IoContext<NetworkIoMessage<Message>>, message: &NetworkIoMessage<Message>) {
if self.stopping.load(AtomicOrdering::Acquire) {
return;
}
match *message {
NetworkIoMessage::AddHandler {
ref handler,
@ -1009,6 +1045,6 @@ fn host_client_url() {
let mut config = NetworkConfiguration::new();
let key = h256_from_hex("6f7b0d801bc7b5ce7bbd930b84fd0369b3eb25d09be58d64ba811091046f3aa2");
config.use_secret = Some(key);
let host: Host<u32> = Host::new(config).unwrap();
let host: Host<u32> = Host::new(config, Arc::new(NetworkStats::new())).unwrap();
assert!(host.local_url().starts_with("enode://101b3ef5a4ea7a1c7928e24c4c75fd053c235d7b80c22ae5c03d145d0ac7396e2a4ffff9adee3133a7b05044a5cee08115fd65145e5165d646bde371010d803c@"));
}

View File

@ -56,8 +56,9 @@
//! }
//!
//! fn main () {
//! let mut service = NetworkService::<MyMessage>::start(NetworkConfiguration::new_local()).expect("Error creating network service");
//! let mut service = NetworkService::<MyMessage>::new(NetworkConfiguration::new_local()).expect("Error creating network service");
//! service.register_protocol(Arc::new(MyHandler), "myproto", &[1u8]);
//! service.start().expect("Error starting service");
//!
//! // Wait for quit condition
//! // ...

View File

@ -28,33 +28,33 @@ use io::*;
pub struct NetworkService<Message> where Message: Send + Sync + Clone + 'static {
io_service: IoService<NetworkIoMessage<Message>>,
host_info: String,
host: Arc<Host<Message>>,
host: RwLock<Option<Arc<Host<Message>>>>,
stats: Arc<NetworkStats>,
panic_handler: Arc<PanicHandler>
panic_handler: Arc<PanicHandler>,
config: NetworkConfiguration,
}
impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'static {
/// Starts IO event loop
pub fn start(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> {
pub fn new(config: NetworkConfiguration) -> Result<NetworkService<Message>, UtilError> {
let panic_handler = PanicHandler::new_in_arc();
let mut io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
let io_service = try!(IoService::<NetworkIoMessage<Message>>::start());
panic_handler.forward_from(&io_service);
let host = Arc::new(try!(Host::new(config)));
let stats = host.stats().clone();
let host_info = host.client_version();
try!(io_service.register_handler(host.clone()));
let stats = Arc::new(NetworkStats::new());
let host_info = Host::<Message>::client_version();
Ok(NetworkService {
io_service: io_service,
host_info: host_info,
stats: stats,
panic_handler: panic_handler,
host: host,
host: RwLock::new(None),
config: config,
})
}
/// Regiter a new protocol handler with the event loop.
pub fn register_protocol(&mut self, handler: Arc<NetworkProtocolHandler<Message>+Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> {
pub fn register_protocol(&self, handler: Arc<NetworkProtocolHandler<Message>+Send + Sync>, protocol: ProtocolId, versions: &[u8]) -> Result<(), NetworkError> {
try!(self.io_service.send_message(NetworkIoMessage::AddHandler {
handler: handler,
protocol: protocol,
@ -69,8 +69,8 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
}
/// Returns underlying io service.
pub fn io(&mut self) -> &mut IoService<NetworkIoMessage<Message>> {
&mut self.io_service
pub fn io(&self) -> &IoService<NetworkIoMessage<Message>> {
&self.io_service
}
/// Returns network statistics.
@ -80,12 +80,36 @@ impl<Message> NetworkService<Message> where Message: Send + Sync + Clone + 'stat
/// Returns external url if available.
pub fn external_url(&self) -> Option<String> {
self.host.external_url()
let host = self.host.read().unwrap();
host.as_ref().and_then(|h| h.external_url())
}
/// Returns external url if available.
pub fn local_url(&self) -> String {
self.host.local_url()
pub fn local_url(&self) -> Option<String> {
let host = self.host.read().unwrap();
host.as_ref().map(|h| h.local_url())
}
/// Start network IO
pub fn start(&self) -> Result<(), UtilError> {
let mut host = self.host.write().unwrap();
if host.is_none() {
let h = Arc::new(try!(Host::new(self.config.clone(), self.stats.clone())));
try!(self.io_service.register_handler(h.clone()));
*host = Some(h);
}
Ok(())
}
/// Stop network IO
pub fn stop(&self) -> Result<(), UtilError> {
let mut host = self.host.write().unwrap();
if let Some(ref host) = *host {
let io = IoContext::new(self.io_service.channel(), 0); //TODO: take token id from host
try!(host.stop(&io));
}
*host = None;
Ok(())
}
}

View File

@ -65,7 +65,7 @@ impl NetworkStats {
self.sessions.load(Ordering::Relaxed)
}
#[cfg(test)]
/// Create a new empty instance.
pub fn new() -> NetworkStats {
NetworkStats {
recv: AtomicUsize::new(0),

View File

@ -97,7 +97,8 @@ impl NetworkProtocolHandler<TestProtocolMessage> for TestProtocol {
#[test]
fn net_service() {
let mut service = NetworkService::<TestProtocolMessage>::start(NetworkConfiguration::new_local()).expect("Error creating network service");
let service = NetworkService::<TestProtocolMessage>::new(NetworkConfiguration::new_local()).expect("Error creating network service");
service.start().unwrap();
service.register_protocol(Arc::new(TestProtocol::new(false)), "myproto", &[1u8]).unwrap();
}
@ -108,12 +109,14 @@ fn net_connect() {
let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone());
config1.boot_nodes = vec![ ];
let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap();
let mut service1 = NetworkService::<TestProtocolMessage>::new(config1).unwrap();
service1.start().unwrap();
let handler1 = TestProtocol::register(&mut service1, false);
let mut config2 = NetworkConfiguration::new_local();
info!("net_connect: local URL: {}", service1.local_url());
config2.boot_nodes = vec![ service1.local_url() ];
let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap();
info!("net_connect: local URL: {}", service1.local_url().unwrap());
config2.boot_nodes = vec![ service1.local_url().unwrap() ];
let mut service2 = NetworkService::<TestProtocolMessage>::new(config2).unwrap();
service2.start().unwrap();
let handler2 = TestProtocol::register(&mut service2, false);
while !handler1.got_packet() && !handler2.got_packet() && (service1.stats().sessions() == 0 || service2.stats().sessions() == 0) {
thread::sleep(Duration::from_millis(50));
@ -122,17 +125,28 @@ fn net_connect() {
assert!(service2.stats().sessions() >= 1);
}
#[test]
fn net_start_stop() {
let config = NetworkConfiguration::new_local();
let service = NetworkService::<TestProtocolMessage>::new(config).unwrap();
service.start().unwrap();
service.stop().unwrap();
service.start().unwrap();
}
#[test]
fn net_disconnect() {
let key1 = KeyPair::create().unwrap();
let mut config1 = NetworkConfiguration::new_local();
config1.use_secret = Some(key1.secret().clone());
config1.boot_nodes = vec![ ];
let mut service1 = NetworkService::<TestProtocolMessage>::start(config1).unwrap();
let mut service1 = NetworkService::<TestProtocolMessage>::new(config1).unwrap();
service1.start().unwrap();
let handler1 = TestProtocol::register(&mut service1, false);
let mut config2 = NetworkConfiguration::new_local();
config2.boot_nodes = vec![ service1.local_url() ];
let mut service2 = NetworkService::<TestProtocolMessage>::start(config2).unwrap();
config2.boot_nodes = vec![ service1.local_url().unwrap() ];
let mut service2 = NetworkService::<TestProtocolMessage>::new(config2).unwrap();
service2.start().unwrap();
let handler2 = TestProtocol::register(&mut service2, true);
while !(handler1.got_disconnect() && handler2.got_disconnect()) {
thread::sleep(Duration::from_millis(50));
@ -144,7 +158,8 @@ fn net_disconnect() {
#[test]
fn net_timeout() {
let config = NetworkConfiguration::new_local();
let mut service = NetworkService::<TestProtocolMessage>::start(config).unwrap();
let mut service = NetworkService::<TestProtocolMessage>::new(config).unwrap();
service.start().unwrap();
let handler = TestProtocol::register(&mut service, false);
while !handler.got_timeout() {
thread::sleep(Duration::from_millis(50));