diff --git a/Cargo.lock b/Cargo.lock index 16046c480..1bf522580 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,7 +2,7 @@ name = "wasm" version = "0.1.0" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-bigint 0.1.3", "ethcore-logger 1.8.0", "ethcore-util 1.8.0", @@ -129,7 +129,7 @@ name = "bigint" version = "4.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "crunchy 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -141,7 +141,7 @@ name = "bincode" version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -210,7 +210,7 @@ name = "bn" version = "0.4.4" source = "git+https://github.com/paritytech/bn#b97e95a45f4484a41a515338c4f0e093bf6675e0" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -222,7 +222,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "byteorder" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -230,7 +230,7 @@ name = "bytes" version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -507,7 +507,7 @@ dependencies = [ "bloomable 0.1.0", "bloomchain 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "bn 0.4.4 (git+https://github.com/paritytech/bn)", - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "clippy 0.0.103 (registry+https://github.com/rust-lang/crates.io-index)", "common-types 0.1.0", "crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -742,7 +742,7 @@ dependencies = [ name = "ethcore-secretstore" version = "1.0.0" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethabi 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore 1.8.0", "ethcore-bigint 0.1.3", @@ -855,7 +855,7 @@ dependencies = [ name = "ethkey" version = "0.2.0" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "eth-secp256k1 0.5.6 (git+https://github.com/paritytech/rust-secp256k1)", "ethcore-bigint 0.1.3", "lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -948,7 +948,7 @@ name = "evm" version = "0.1.0" dependencies = [ "bit-set 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "common-types 0.1.0", "ethcore-bigint 0.1.3", "ethcore-logger 1.8.0", @@ -1421,7 +1421,7 @@ name = "libflate" version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1634,7 +1634,7 @@ dependencies = [ name = "native-contracts" version = "0.1.0" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethabi 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-bigint 0.1.3", "futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2062,6 +2062,7 @@ dependencies = [ "ethcore-ipc 1.8.0", "ethcore-light 1.8.0", "ethcore-logger 1.8.0", + "ethcore-network 1.8.0", "ethcore-util 1.8.0", "ethcrypto 0.1.0", "ethjson 0.1.0", @@ -2190,7 +2191,7 @@ name = "parity-wasm" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2200,7 +2201,7 @@ name = "parity-whisper" version = "0.1.0" dependencies = [ "bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-bigint 0.1.3", "ethcore-network 1.8.0", "ethcrypto 0.1.0", @@ -2504,7 +2505,7 @@ dependencies = [ name = "rlp" version = "0.2.0" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "elastic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-bigint 0.1.3", "lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3257,7 +3258,7 @@ dependencies = [ name = "vm" version = "0.1.0" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "common-types 0.1.0", "ethcore-bigint 0.1.3", "ethcore-util 1.8.0", @@ -3301,7 +3302,7 @@ name = "ws" version = "0.7.1" source = "git+https://github.com/tomusdrw/ws-rs#f8306a798b7541d64624299a83a2c934f173beed" dependencies = [ - "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3380,7 +3381,7 @@ dependencies = [ "checksum bloomchain 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3f421095d2a76fc24cd3fb3f912b90df06be7689912b1bdb423caefae59c258d" "checksum bn 0.4.4 (git+https://github.com/paritytech/bn)" = "" "checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855" -"checksum byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c40977b0ee6b9885c9013cd41d9feffdd22deb3bb4dc3a71d901cc7a77de18c8" +"checksum byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ff81738b726f5d099632ceaffe7fb65b90212e8dce59d518729e7e8634032d3d" "checksum bytes 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8b24f16593f445422331a5eed46b72f7f171f910fead4f2ea8f17e727e9c5c14" "checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c" "checksum cid 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "34aa7da06f10541fbca6850719cdaa8fa03060a5d2fb33840f149cf8133a00c7" diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 1e32c8bf1..0a2fd87f1 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -354,14 +354,14 @@ impl FullDependencies { }, Api::Whisper => { if let Some(ref whisper_rpc) = self.whisper_rpc { - let whisper = whisper_rpc.make_handler(); + let whisper = whisper_rpc.make_handler(self.net.clone()); handler.extend_with(::parity_whisper::rpc::Whisper::to_delegate(whisper)); } } Api::WhisperPubSub => { if !for_generic_pubsub { if let Some(ref whisper_rpc) = self.whisper_rpc { - let whisper = whisper_rpc.make_handler(); + let whisper = whisper_rpc.make_handler(self.net.clone()); handler.extend_with( ::parity_whisper::rpc::WhisperPubSub::to_delegate(whisper) ); @@ -554,13 +554,13 @@ impl LightDependencies { }, Api::Whisper => { if let Some(ref whisper_rpc) = self.whisper_rpc { - let whisper = whisper_rpc.make_handler(); + let whisper = whisper_rpc.make_handler(self.net.clone()); handler.extend_with(::parity_whisper::rpc::Whisper::to_delegate(whisper)); } } Api::WhisperPubSub => { if let Some(ref whisper_rpc) = self.whisper_rpc { - let whisper = whisper_rpc.make_handler(); + let whisper = whisper_rpc.make_handler(self.net.clone()); handler.extend_with(::parity_whisper::rpc::WhisperPubSub::to_delegate(whisper)); } } diff --git a/parity/run.rs b/parity/run.rs index ee8ce5638..d0f967410 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -246,10 +246,8 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc) -> let mut attached_protos = Vec::new(); let whisper_factory = if cmd.whisper.enabled { - let (whisper_net, whisper_factory) = ::whisper::setup(cmd.whisper.target_message_pool_size) + let whisper_factory = ::whisper::setup(cmd.whisper.target_message_pool_size, &mut attached_protos) .map_err(|e| format!("Failed to initialize whisper: {}", e))?; - - attached_protos.push(whisper_net); whisper_factory } else { None @@ -638,10 +636,9 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc) -> R let mut attached_protos = Vec::new(); let whisper_factory = if cmd.whisper.enabled { - let (whisper_net, whisper_factory) = ::whisper::setup(cmd.whisper.target_message_pool_size) + let whisper_factory = ::whisper::setup(cmd.whisper.target_message_pool_size, &mut attached_protos) .map_err(|e| format!("Failed to initialize whisper: {}", e))?; - attached_protos.push(whisper_net); whisper_factory } else { None diff --git a/parity/whisper.rs b/parity/whisper.rs index cab117ae6..f8d33626b 100644 --- a/parity/whisper.rs +++ b/parity/whisper.rs @@ -17,10 +17,11 @@ use std::sync::Arc; use std::io; -use ethsync::AttachedProtocol; +use ethsync::{AttachedProtocol, ManageNetwork}; use parity_rpc::Metadata; -use parity_whisper::net::{self as whisper_net, PoolHandle, Network as WhisperNetwork}; -use parity_whisper::rpc::{WhisperClient, FilterManager}; +use parity_whisper::message::Message; +use parity_whisper::net::{self as whisper_net, Network as WhisperNetwork}; +use parity_whisper::rpc::{WhisperClient, PoolHandle, FilterManager}; /// Whisper config. #[derive(Debug, PartialEq, Eq)] @@ -38,6 +39,31 @@ impl Default for Config { } } +/// Standard pool handle. +pub struct NetPoolHandle { + /// Pool handle. + handle: Arc>>, + /// Network manager. + net: Arc, +} + +impl PoolHandle for NetPoolHandle { + fn relay(&self, message: Message) -> bool { + let mut res = false; + let mut message = Some(message); + self.net.with_proto_context(whisper_net::PROTOCOL_ID, &mut move |ctx| { + if let Some(message) = message.take() { + res = self.handle.post_message(message, ctx); + } + }); + res + } + + fn pool_status(&self) -> whisper_net::PoolStatus { + self.handle.pool_status() + } +} + /// Factory for standard whisper RPC. pub struct RpcFactory { net: Arc>>, @@ -45,8 +71,9 @@ pub struct RpcFactory { } impl RpcFactory { - pub fn make_handler(&self) -> WhisperClient { - WhisperClient::new(self.net.handle(), self.manager.clone()) + pub fn make_handler(&self, net: Arc) -> WhisperClient { + let handle = NetPoolHandle { handle: self.net.clone(), net: net }; + WhisperClient::new(handle, self.manager.clone()) } } @@ -54,24 +81,36 @@ impl RpcFactory { /// /// Will target the given pool size. #[cfg(not(feature = "ipc"))] -pub fn setup(target_pool_size: usize) -> io::Result<(AttachedProtocol, Option)> { +pub fn setup(target_pool_size: usize, protos: &mut Vec) + -> io::Result> +{ let manager = Arc::new(FilterManager::new()?); let net = Arc::new(WhisperNetwork::new(target_pool_size, manager.clone())); - let proto = AttachedProtocol { + protos.push(AttachedProtocol { handler: net.clone() as Arc<_>, packet_count: whisper_net::PACKET_COUNT, versions: whisper_net::SUPPORTED_VERSIONS, - protocol_id: *b"shh", - }; + protocol_id: whisper_net::PROTOCOL_ID, + }); + + // parity-only extensions to whisper. + protos.push(AttachedProtocol { + handler: Arc::new(whisper_net::ParityExtensions), + packet_count: whisper_net::PACKET_COUNT, + versions: whisper_net::SUPPORTED_VERSIONS, + protocol_id: whisper_net::PARITY_PROTOCOL_ID, + }); let factory = RpcFactory { net: net, manager: manager }; - Ok((proto, Some(factory))) + Ok(Some(factory)) } // TODO: make it possible to attach generic protocols in IPC. #[cfg(feature = "ipc")] -pub fn setup(_pool: usize) -> (AttachedProtocol, Option) { - Ok((AttachedProtocol, None)) +pub fn setup(_target_pool_size: usize, _protos: &mut Vec) + -> io::Result> +{ + Ok(None) } diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index c213af8df..3d234d2d6 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -62,5 +62,8 @@ hash = { path = "../util/hash" } clippy = { version = "0.0.103", optional = true} pretty_assertions = "0.1" +[dev-dependencies] +ethcore-network = { path = "../util/network" } + [features] dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev"] diff --git a/rpc/src/v1/tests/mocked/manage_network.rs b/rpc/src/v1/tests/mocked/manage_network.rs index e5e3fd340..9438429cd 100644 --- a/rpc/src/v1/tests/mocked/manage_network.rs +++ b/rpc/src/v1/tests/mocked/manage_network.rs @@ -15,6 +15,9 @@ // along with Parity. If not, see . use ethsync::{ManageNetwork, NetworkConfiguration}; +use self::ethcore_network::{ProtocolId, NetworkContext}; + +extern crate ethcore_network; pub struct TestManageNetwork; @@ -27,4 +30,5 @@ impl ManageNetwork for TestManageNetwork { fn start_network(&self) {} fn stop_network(&self) {} fn network_config(&self) -> NetworkConfiguration { NetworkConfiguration::new_local() } + fn with_proto_context(&self, _: ProtocolId, _: &mut FnMut(&NetworkContext)) { } } diff --git a/sync/src/api.rs b/sync/src/api.rs index 6f4b14970..0dfe51efd 100644 --- a/sync/src/api.rs +++ b/sync/src/api.rs @@ -497,6 +497,8 @@ pub trait ManageNetwork : Send + Sync { fn stop_network(&self); /// Query the current configuration of the network fn network_config(&self) -> NetworkConfiguration; + /// Get network context for protocol. + fn with_proto_context(&self, proto: ProtocolId, f: &mut FnMut(&NetworkContext)); } @@ -538,6 +540,10 @@ impl ManageNetwork for EthSync { fn network_config(&self) -> NetworkConfiguration { NetworkConfiguration::from(self.network.config().clone()) } + + fn with_proto_context(&self, proto: ProtocolId, f: &mut FnMut(&NetworkContext)) { + self.network.with_context_eval(proto, f); + } } #[derive(Debug, Clone, PartialEq, Eq)] @@ -808,6 +814,10 @@ impl ManageNetwork for LightSync { fn network_config(&self) -> NetworkConfiguration { NetworkConfiguration::from(self.network.config().clone()) } + + fn with_proto_context(&self, proto: ProtocolId, f: &mut FnMut(&NetworkContext)) { + self.network.with_context_eval(proto, f); + } } impl LightSyncProvider for LightSync { diff --git a/whisper/src/message.rs b/whisper/src/message.rs index e5202c583..fe6997c01 100644 --- a/whisper/src/message.rs +++ b/whisper/src/message.rs @@ -56,23 +56,18 @@ impl Topic { /// this takes 3 sets of 9 bits, treating each as an index in the range /// 0..512 into the bloom and setting the corresponding bit in the bloom to 1. pub fn bloom_into(&self, bloom: &mut H512) { - let mut set_bit = |idx: usize| { - let idx = idx & 511; - bloom[idx / 8] |= 1 << idx % 8; - }; let data = &self.0; - let mut combined = ((data[0] as usize) << 24) | - ((data[1] as usize) << 16) | - ((data[2] as usize) << 8) | - data[3] as usize; + for i in 0..3 { + let mut idx = data[i] as usize; - // take off the last 5 bits as we only use 27. - combined >>= 5; + if data[3] & (1 << i) != 0 { + idx += 256; + } - set_bit(combined); - set_bit(combined >> 9); - set_bit(combined >> 18); + debug_assert!(idx <= 511); + bloom[idx / 8] |= 1 << (7 - idx % 8); + } } /// Get bloom for single topic. @@ -118,6 +113,7 @@ pub fn bloom_topics(topics: &[Topic]) -> H512 { #[derive(Debug)] pub enum Error { Decoder(DecoderError), + EmptyTopics, LivesTooLong, IssuedInFuture, ZeroTTL, @@ -136,10 +132,27 @@ impl fmt::Display for Error { Error::LivesTooLong => write!(f, "Message claims to be issued before the unix epoch."), Error::IssuedInFuture => write!(f, "Message issued in future."), Error::ZeroTTL => write!(f, "Message live for zero time."), + Error::EmptyTopics => write!(f, "Message has no topics."), } } } +fn append_topics<'a>(s: &'a mut RlpStream, topics: &[Topic]) -> &'a mut RlpStream { + if topics.len() == 1 { + s.append(&topics[0]) + } else { + s.append_list(&topics) + } +} + +fn decode_topics(rlp: UntrustedRlp) -> Result, DecoderError> { + if rlp.is_list() { + rlp.iter().map(|r| r.as_val::()).collect() + } else { + rlp.as_val().map(|t| SmallVec::from_slice(&[t])) + } +} + // Raw envelope struct. #[derive(Clone, Debug, PartialEq, Eq)] pub struct Envelope { @@ -156,15 +169,20 @@ pub struct Envelope { } impl Envelope { + /// Whether the message is multi-topic. Only relay these to Parity peers. + pub fn is_multitopic(&self) -> bool { + self.topics.len() != 1 + } + fn proving_hash(&self) -> H256 { use byteorder::{BigEndian, ByteOrder}; let mut buf = [0; 32]; let mut stream = RlpStream::new_list(4); - stream.append(&self.expiry) - .append(&self.ttl) - .append_list(&self.topics) + stream.append(&self.expiry).append(&self.ttl); + + append_topics(&mut stream, &self.topics) .append(&self.data); let mut digest = Keccak::new_keccak256(); @@ -185,8 +203,9 @@ impl rlp::Encodable for Envelope { fn rlp_append(&self, s: &mut RlpStream) { s.begin_list(5) .append(&self.expiry) - .append(&self.ttl) - .append_list(&self.topics) + .append(&self.ttl); + + append_topics(s, &self.topics) .append(&self.data) .append(&self.nonce); } @@ -199,13 +218,17 @@ impl rlp::Decodable for Envelope { Ok(Envelope { expiry: rlp.val_at(0)?, ttl: rlp.val_at(1)?, - topics: rlp.at(2)?.iter().map(|x| x.as_val()).collect::>()?, + topics: decode_topics(rlp.at(2)?)?, data: rlp.val_at(3)?, nonce: rlp.val_at(4)?, }) } } +/// Error indicating no topics. +#[derive(Debug, Copy, Clone)] +pub struct EmptyTopics; + /// Message creation parameters. /// Pass this to `Message::create` to make a message. pub struct CreateParams { @@ -213,7 +236,7 @@ pub struct CreateParams { pub ttl: u64, /// payload data. pub payload: Vec, - /// Topics. + /// Topics. May not be empty. pub topics: Vec, /// How many milliseconds to spend proving work. pub work: u64, @@ -231,10 +254,12 @@ pub struct Message { impl Message { /// Create a message from creation parameters. /// Panics if TTL is 0. - pub fn create(params: CreateParams) -> Self { + pub fn create(params: CreateParams) -> Result { use byteorder::{BigEndian, ByteOrder}; use rand::{Rng, SeedableRng, XorShiftRng}; + if params.topics.is_empty() { return Err(EmptyTopics) } + let mut rng = { let mut thread_rng = ::rand::thread_rng(); @@ -254,10 +279,8 @@ impl Message { let start_digest = { let mut stream = RlpStream::new_list(4); - stream.append(&expiry) - .append(¶ms.ttl) - .append_list(¶ms.topics) - .append(¶ms.payload); + stream.append(&expiry).append(¶ms.ttl); + append_topics(&mut stream, ¶ms.topics).append(¶ms.payload); let mut digest = Keccak::new_keccak256(); digest.update(&*stream.drain()); @@ -300,12 +323,12 @@ impl Message { let encoded = ::rlp::encode(&envelope); - Message::from_components( + Ok(Message::from_components( envelope, encoded.len(), H256(keccak256(&encoded)), SystemTime::now(), - ).expect("Message generated here known to be valid; qed") + ).expect("Message generated here known to be valid; qed")) } /// Decode message from RLP and check for validity against system time. @@ -327,6 +350,8 @@ impl Message { if envelope.expiry <= envelope.ttl { return Err(Error::LivesTooLong) } if envelope.ttl == 0 { return Err(Error::ZeroTTL) } + if envelope.topics.is_empty() { return Err(Error::EmptyTopics) } + let issue_time_adjusted = Duration::from_secs( (envelope.expiry - envelope.ttl).saturating_sub(LEEWAY_SECONDS) ); @@ -394,6 +419,7 @@ mod tests { use super::*; use std::time::{self, Duration, SystemTime}; use rlp::UntrustedRlp; + use smallvec::SmallVec; fn unix_time(x: u64) -> SystemTime { time::UNIX_EPOCH + Duration::from_secs(x) @@ -401,12 +427,12 @@ mod tests { #[test] fn create_message() { - let _ = Message::create(CreateParams { + assert!(Message::create(CreateParams { ttl: 100, payload: vec![1, 2, 3, 4], - topics: Vec::new(), + topics: vec![Topic([1, 2, 1, 2])], work: 50, - }); + }).is_ok()); } #[test] @@ -415,7 +441,23 @@ mod tests { expiry: 100_000, ttl: 30, data: vec![9; 256], - topics: Default::default(), + topics: SmallVec::from_slice(&[Default::default()]), + nonce: 1010101, + }; + + let encoded = ::rlp::encode(&envelope); + let decoded = ::rlp::decode(&encoded); + + assert_eq!(envelope, decoded) + } + + #[test] + fn round_trip_multitopic() { + let envelope = Envelope { + expiry: 100_000, + ttl: 30, + data: vec![9; 256], + topics: SmallVec::from_slice(&[Default::default(), Topic([1, 2, 3, 4])]), nonce: 1010101, }; @@ -431,7 +473,7 @@ mod tests { expiry: 100_000, ttl: 30, data: vec![9; 256], - topics: Default::default(), + topics: SmallVec::from_slice(&[Default::default()]), nonce: 1010101, }; @@ -450,7 +492,7 @@ mod tests { expiry: 100_000, ttl: 30, data: vec![9; 256], - topics: Default::default(), + topics: SmallVec::from_slice(&[Default::default()]), nonce: 1010101, }; @@ -467,7 +509,7 @@ mod tests { expiry: 100_000, ttl: 200_000, data: vec![9; 256], - topics: Default::default(), + topics: SmallVec::from_slice(&[Default::default()]), nonce: 1010101, }; diff --git a/whisper/src/net.rs b/whisper/src/net.rs index 5553f27e6..dab32ad2c 100644 --- a/whisper/src/net.rs +++ b/whisper/src/net.rs @@ -23,31 +23,45 @@ use std::time::{Duration, SystemTime}; use std::sync::Arc; use bigint::hash::{H256, H512}; -use network::{HostInfo, NetworkContext, NetworkError, NodeId, PeerId, TimerToken}; +use network::{HostInfo, NetworkContext, NetworkError, NodeId, PeerId, ProtocolId, TimerToken}; use ordered_float::OrderedFloat; use parking_lot::{Mutex, RwLock}; use rlp::{DecoderError, RlpStream, UntrustedRlp}; use message::{Message, Error as MessageError}; +// how often periodic relays are. when messages are imported +// we directly broadcast. const RALLY_TOKEN: TimerToken = 1; -const RALLY_TIMEOUT_MS: u64 = 750; // supposed to be at least once per second. +const RALLY_TIMEOUT_MS: u64 = 2500; -const PROTOCOL_VERSION: usize = 2; +/// Current protocol version. +pub const PROTOCOL_VERSION: usize = 6; /// Supported protocol versions. pub const SUPPORTED_VERSIONS: &'static [u8] = &[PROTOCOL_VERSION as u8]; // maximum tolerated delay between messages packets. -const MAX_TOLERATED_DELAY_MS: u64 = 2000; +const MAX_TOLERATED_DELAY_MS: u64 = 5000; -/// Number of packets. -pub const PACKET_COUNT: u8 = 3; +/// Number of packets. A bunch are reserved. +pub const PACKET_COUNT: u8 = 128; + +/// Whisper protocol ID +pub const PROTOCOL_ID: ::network::ProtocolId = *b"shh"; + +/// Parity-whisper protocol ID +/// Current parity-specific extensions: +/// - Multiple topics in packet. +pub const PARITY_PROTOCOL_ID: ::network::ProtocolId = *b"pwh"; mod packet { pub const STATUS: u8 = 0; pub const MESSAGES: u8 = 1; - pub const TOPIC_FILTER: u8 = 2; + pub const POW_REQUIREMENT: u8 = 2; + pub const TOPIC_FILTER: u8 = 3; + + // 126, 127 for mail server stuff we will never implement here. } /// Handles messages within a single packet. @@ -67,11 +81,9 @@ enum Error { Decoder(DecoderError), Network(NetworkError), Message(MessageError), - UnknownPacket(u8), UnknownPeer(PeerId), - ProtocolVersionMismatch(usize), - SameNodeKey, UnexpectedMessage, + InvalidPowReq, } impl From for Error { @@ -98,12 +110,9 @@ impl fmt::Display for Error { Error::Decoder(ref err) => write!(f, "Failed to decode packet: {}", err), Error::Network(ref err) => write!(f, "Network error: {}", err), Error::Message(ref err) => write!(f, "Error decoding message: {}", err), - Error::UnknownPacket(ref id) => write!(f, "Unknown packet kind: {}", id), Error::UnknownPeer(ref id) => write!(f, "Message received from unknown peer: {}", id), - Error::ProtocolVersionMismatch(ref proto) => - write!(f, "Unknown protocol version: {}", proto), Error::UnexpectedMessage => write!(f, "Unexpected message."), - Error::SameNodeKey => write!(f, "Peer and us have same node key."), + Error::InvalidPowReq => write!(f, "Peer sent invalid PoW requirement."), } } } @@ -298,15 +307,18 @@ impl Messages { enum State { Unconfirmed(SystemTime), // awaiting status packet. - TheirTurn(SystemTime), // it has been their turn to send since stored time. - OurTurn, + Confirmed, } +#[allow(dead_code)] // for node key. this will be useful for topic routing. struct Peer { node_key: NodeId, state: State, known_messages: HashSet, topic_filter: Option, + pow_requirement: f64, + is_parity: bool, + _protocol_version: usize, } impl Peer { @@ -319,12 +331,14 @@ impl Peer { // whether this peer will accept the message. fn will_accept(&self, message: &Message) -> bool { - let known = self.known_messages.contains(message.hash()); + if self.known_messages.contains(message.hash()) { return false } - let matches_bloom = self.topic_filter.as_ref() - .map_or(true, |topic| topic & message.bloom() == message.bloom().clone()); + // only parity peers will accept multitopic messages. + if message.envelope().is_multitopic() && !self.is_parity { return false } + if message.work_proved() < self.pow_requirement { return false } - !known && matches_bloom + self.topic_filter.as_ref() + .map_or(true, |filter| &(filter & message.bloom()) == message.bloom()) } // note a message as known. returns true if it was already @@ -337,10 +351,14 @@ impl Peer { self.topic_filter = Some(topic); } + fn set_pow_requirement(&mut self, pow_requirement: f64) { + self.pow_requirement = pow_requirement; + } + fn can_send_messages(&self) -> bool { match self.state { - State::Unconfirmed(_) | State::OurTurn => false, - State::TheirTurn(_) => true, + State::Unconfirmed(_) => false, + State::Confirmed => true, } } } @@ -357,21 +375,41 @@ pub struct PoolStatus { pub target_size: usize, } -/// Handle to the pool, for posting messages or getting info. -#[derive(Clone)] -pub struct PoolHandle { - messages: Arc>, +/// Generic network context. +pub trait Context { + /// Disconnect a peer. + fn disconnect_peer(&self, PeerId); + /// Disable a peer. + fn disable_peer(&self, PeerId); + /// Get a peer's node key. + fn node_key(&self, PeerId) -> Option; + /// Get a peer's protocol version for given protocol. + fn protocol_version(&self, ProtocolId, PeerId) -> Option; + /// Send message to peer. + fn send(&self, PeerId, u8, Vec); } -impl PoolHandle { - /// Post a message to the whisper network to be relayed. - pub fn post_message(&self, message: Message) -> bool { - self.messages.write().insert(message) +impl<'a> Context for NetworkContext<'a> { + fn disconnect_peer(&self, peer: PeerId) { + NetworkContext::disconnect_peer(self, peer); + } + fn disable_peer(&self, peer: PeerId) { + NetworkContext::disable_peer(self, peer) + } + fn node_key(&self, peer: PeerId) -> Option { + self.session_info(peer).and_then(|info| info.id) + } + fn protocol_version(&self, proto_id: ProtocolId, peer: PeerId) -> Option { + NetworkContext::protocol_version(self, proto_id, peer) } - /// Get number of messages and amount of memory used by them. - pub fn pool_status(&self) -> PoolStatus { - self.messages.read().status() + fn send(&self, peer: PeerId, packet_id: u8, message: Vec) { + if let Err(e) = NetworkContext::send(self, peer, packet_id, message) { + debug!(target: "whisper", "Failed to send packet {} to peer {}: {}", + packet_id, peer, e); + + self.disconnect_peer(peer) + } } } @@ -395,15 +433,23 @@ impl Network { } } - /// Acquire a sender to asynchronously feed messages to the whisper - /// network. - pub fn handle(&self) -> PoolHandle { - PoolHandle { messages: self.messages.clone() } + /// Post a message to the whisper network to be relayed. + pub fn post_message(&self, message: Message, context: &C) -> bool + where T: MessageHandler + { + let ok = self.messages.write().insert(message); + if ok { self.rally(context) } + ok + } + + /// Get number of messages and amount of memory used by them. + pub fn pool_status(&self) -> PoolStatus { + self.messages.read().status() } } impl Network { - fn rally(&self, io: &NetworkContext) { + fn rally(&self, io: &C) { // cannot be greater than 16MB (protocol limitation) const MAX_MESSAGES_PACKET_SIZE: usize = 8 * 1024 * 1024; @@ -428,11 +474,11 @@ impl Network { // check timeouts and skip peers who we can't send a rally to. match peer_data.state { - State::Unconfirmed(ref time) | State::TheirTurn(ref time) => { + State::Unconfirmed(ref time) => { punish_timeout(time); continue; } - State::OurTurn => {} + State::Confirmed => {} } // construct packet, skipping messages the peer won't accept. @@ -452,39 +498,19 @@ impl Network { stream.complete_unbounded_list(); - peer_data.state = State::TheirTurn(SystemTime::now()); - if let Err(e) = io.send(*peer_id, packet::MESSAGES, stream.out()) { - debug!(target: "whisper", "Failed to send messages packet to peer {}: {}", peer_id, e); - io.disconnect_peer(*peer_id); - } + io.send(*peer_id, packet::MESSAGES, stream.out()); } } // handle status packet from peer. - fn on_status(&self, peer: &PeerId, status: UntrustedRlp) + fn on_status(&self, peer: &PeerId, _status: UntrustedRlp) -> Result<(), Error> { - let proto: usize = status.as_val()?; - if proto != PROTOCOL_VERSION { return Err(Error::ProtocolVersionMismatch(proto)) } - let peers = self.peers.read(); + match peers.get(peer) { Some(peer) => { - let mut peer = peer.lock(); - let our_node_key = self.node_key.read().clone(); - - // handle this basically impossible edge case gracefully. - if peer.node_key == our_node_key { - return Err(Error::SameNodeKey); - } - - // peer with lower node key begins the rally. - if peer.node_key > our_node_key { - peer.state = State::OurTurn; - } else { - peer.state = State::TheirTurn(SystemTime::now()); - } - + peer.lock().state = State::Confirmed; Ok(()) } None => { @@ -513,8 +539,6 @@ impl Network { return Err(Error::UnexpectedMessage); } - peer.state = State::OurTurn; - let now = SystemTime::now(); let mut messages_vec = message_packet.iter().map(|rlp| Message::decode(rlp, now)) .collect::, _>>()?; @@ -541,6 +565,42 @@ impl Network { Ok(()) } + fn on_pow_requirement(&self, peer: &PeerId, requirement: UntrustedRlp) + -> Result<(), Error> + { + use byteorder::{ByteOrder, BigEndian}; + + let peers = self.peers.read(); + match peers.get(peer) { + Some(peer) => { + let mut peer = peer.lock(); + + if let State::Unconfirmed(_) = peer.state { + return Err(Error::UnexpectedMessage); + } + let bytes: Vec = requirement.as_val()?; + if bytes.len() != ::std::mem::size_of::() { + return Err(Error::InvalidPowReq); + } + + // as of byteorder 1.1.0, this is always defined. + let req = BigEndian::read_f64(&bytes[..]); + + if !req.is_normal() { + return Err(Error::InvalidPowReq); + } + + peer.set_pow_requirement(req); + } + None => { + debug!(target: "whisper", "Received message from unknown peer."); + return Err(Error::UnknownPeer(*peer)); + } + } + + Ok(()) + } + fn on_topic_filter(&self, peer: &PeerId, filter: UntrustedRlp) -> Result<(), Error> { @@ -564,10 +624,10 @@ impl Network { Ok(()) } - fn on_connect(&self, io: &NetworkContext, peer: &PeerId) { + fn on_connect(&self, io: &C, peer: &PeerId) { trace!(target: "whisper", "Connecting peer {}", peer); - let node_key = match io.session_info(*peer).and_then(|info| info.id) { + let node_key = match io.node_key(*peer) { Some(node_key) => node_key, None => { debug!(target: "whisper", "Disconnecting peer {}, who has no node key.", peer); @@ -576,17 +636,25 @@ impl Network { } }; + let version = match io.protocol_version(PROTOCOL_ID, *peer) { + Some(version) => version as usize, + None => { + io.disable_peer(*peer); + return + } + }; + self.peers.write().insert(*peer, Mutex::new(Peer { node_key: node_key, state: State::Unconfirmed(SystemTime::now()), known_messages: HashSet::new(), topic_filter: None, + pow_requirement: 0f64, + is_parity: io.protocol_version(PARITY_PROTOCOL_ID, *peer).is_some(), + _protocol_version: version, })); - if let Err(e) = io.send(*peer, packet::STATUS, ::rlp::encode(&PROTOCOL_VERSION).to_vec()) { - debug!(target: "whisper", "Error sending status: {}", e); - io.disconnect_peer(*peer); - } + io.send(*peer, packet::STATUS, ::rlp::EMPTY_LIST_RLP.to_vec()); } fn on_disconnect(&self, peer: &PeerId) { @@ -609,8 +677,9 @@ impl ::network::NetworkProtocolHandler for Network { let res = match packet_id { packet::STATUS => self.on_status(peer, rlp), packet::MESSAGES => self.on_messages(peer, rlp), + packet::POW_REQUIREMENT => self.on_pow_requirement(peer, rlp), packet::TOPIC_FILTER => self.on_topic_filter(peer, rlp), - other => Err(Error::UnknownPacket(other)), + _ => Ok(()), // ignore unknown packets. }; if let Err(e) = res { @@ -636,3 +705,19 @@ impl ::network::NetworkProtocolHandler for Network { } } } + +/// Dummy subprotocol used for parity extensions. +#[derive(Debug, Copy, Clone)] +pub struct ParityExtensions; + +impl ::network::NetworkProtocolHandler for ParityExtensions { + fn initialize(&self, _io: &NetworkContext, _host_info: &HostInfo) { } + + fn read(&self, _io: &NetworkContext, _peer: &PeerId, _id: u8, _msg: &[u8]) { } + + fn connected(&self, _io: &NetworkContext, _peer: &PeerId) { } + + fn disconnected(&self, _io: &NetworkContext, _peer: &PeerId) { } + + fn timeout(&self, _io: &NetworkContext, _timer: TimerToken) { } +} diff --git a/whisper/src/rpc/filter.rs b/whisper/src/rpc/filter.rs index 2b0c7544d..f4dd7bb9a 100644 --- a/whisper/src/rpc/filter.rs +++ b/whisper/src/rpc/filter.rs @@ -307,7 +307,7 @@ impl Filter { #[cfg(test)] mod tests { - use message::{CreateParams, Message}; + use message::{CreateParams, Message, Topic}; use rpc::types::{FilterRequest, HexEncode}; use rpc::abridge_topic; use super::*; @@ -325,38 +325,40 @@ mod tests { #[test] fn basic_match() { - let topics = vec![vec![1, 2, 3], vec![4, 5, 6]]; + let topics = vec![vec![1, 2, 3, 4], vec![5, 6, 7, 8]]; + let abridged_topics: Vec<_> = topics.iter().map(|x| abridge_topic(&x)).collect(); + let req = FilterRequest { decrypt_with: Default::default(), from: None, - topics: topics.iter().cloned().map(HexEncode).collect(), + topics: topics.into_iter().map(HexEncode).collect(), }; let filter = Filter::new(req).unwrap(); let message = Message::create(CreateParams { ttl: 100, payload: vec![1, 3, 5, 7, 9], - topics: topics.iter().map(|x| abridge_topic(&x)).collect(), + topics: abridged_topics.clone(), work: 0, - }); + }).unwrap(); assert!(filter.basic_matches(&message)); let message = Message::create(CreateParams { ttl: 100, payload: vec![1, 3, 5, 7, 9], - topics: topics.iter().take(1).map(|x| abridge_topic(&x)).collect(), + topics: abridged_topics.clone(), work: 0, - }); + }).unwrap(); assert!(filter.basic_matches(&message)); let message = Message::create(CreateParams { ttl: 100, payload: vec![1, 3, 5, 7, 9], - topics: Vec::new(), + topics: vec![Topic([1, 8, 3, 99])], work: 0, - }); + }).unwrap(); assert!(!filter.basic_matches(&message)); } @@ -366,6 +368,9 @@ mod tests { use rpc::payload::{self, EncodeParams}; use rpc::key_store::{Key, KeyStore}; + let topics = vec![vec![1, 2, 3, 4], vec![5, 6, 7, 8]]; + let abridged_topics: Vec<_> = topics.iter().map(|x| abridge_topic(&x)).collect(); + let mut store = KeyStore::new().unwrap(); let signing_pair = Key::new_asymmetric(store.rng()); let encrypting_key = Key::new_symmetric(store.rng()); @@ -386,24 +391,25 @@ mod tests { let message = Message::create(CreateParams { ttl: 100, payload: encrypted, - topics: vec![abridge_topic(&[9; 32])], + topics: abridged_topics.clone(), work: 0, - }); + }).unwrap(); let message2 = Message::create(CreateParams { ttl: 100, payload: vec![3, 5, 7, 9], - topics: vec![abridge_topic(&[9; 32])], + topics: abridged_topics, work: 0, - }); + }).unwrap(); let filter = Filter::new(FilterRequest { decrypt_with: Some(HexEncode(decrypt_id)), from: Some(HexEncode(signing_pair.public().unwrap().clone())), - topics: vec![HexEncode(vec![9; 32])], + topics: topics.into_iter().map(HexEncode).collect(), }).unwrap(); assert!(filter.basic_matches(&message)); + assert!(filter.basic_matches(&message2)); let items = ::std::cell::Cell::new(0); let on_match = |_| { items.set(items.get() + 1); }; diff --git a/whisper/src/rpc/mod.rs b/whisper/src/rpc/mod.rs index 6cec20c69..ed47ada15 100644 --- a/whisper/src/rpc/mod.rs +++ b/whisper/src/rpc/mod.rs @@ -155,16 +155,6 @@ pub trait PoolHandle: Send + Sync { fn pool_status(&self) -> ::net::PoolStatus; } -impl PoolHandle for ::net::PoolHandle { - fn relay(&self, message: Message) -> bool { - self.post_message(message) - } - - fn pool_status(&self) -> ::net::PoolStatus { - ::net::PoolHandle::pool_status(self) - } -} - /// Default, simple metadata implementation. #[derive(Clone, Default)] pub struct Meta { @@ -339,7 +329,7 @@ impl Whisper for WhisperClien payload: encrypted, topics: req.topics.into_iter().map(|x| abridge_topic(&x.into_inner())).collect(), work: req.priority, - }); + }).map_err(|_| whisper_error("Empty topics"))?; if !self.pool.relay(message) { Err(whisper_error("PoW too low to compete with other messages")) diff --git a/whisper/src/rpc/types.rs b/whisper/src/rpc/types.rs index ac06d69d5..9b2d53218 100644 --- a/whisper/src/rpc/types.rs +++ b/whisper/src/rpc/types.rs @@ -221,7 +221,7 @@ pub struct FilterItem { /// Time to live in seconds. pub ttl: u64, - /// Abridged topics that matched the filter. + /// Topics that matched the filter. pub topics: Vec, /// Unix timestamp of the message generation.