Compatibility with whisper v6 (#6179)

* compatibility with whisper v6

* separate subprotocol for parity extensions

* kill version field
This commit is contained in:
Robert Habermeier 2017-09-10 18:02:14 +02:00 committed by Gav Wood
parent 246b5282e5
commit 375668bc40
12 changed files with 349 additions and 172 deletions

37
Cargo.lock generated
View File

@ -2,7 +2,7 @@
name = "wasm"
version = "0.1.0"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-bigint 0.1.3",
"ethcore-logger 1.8.0",
"ethcore-util 1.8.0",
@ -129,7 +129,7 @@ name = "bigint"
version = "4.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"crunchy 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"heapsize 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -141,7 +141,7 @@ name = "bincode"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"num-traits 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -210,7 +210,7 @@ name = "bn"
version = "0.4.4"
source = "git+https://github.com/paritytech/bn#b97e95a45f4484a41a515338c4f0e093bf6675e0"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -222,7 +222,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "byteorder"
version = "1.0.0"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
@ -230,7 +230,7 @@ name = "bytes"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"iovec 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -507,7 +507,7 @@ dependencies = [
"bloomable 0.1.0",
"bloomchain 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bn 0.4.4 (git+https://github.com/paritytech/bn)",
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"clippy 0.0.103 (registry+https://github.com/rust-lang/crates.io-index)",
"common-types 0.1.0",
"crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
@ -742,7 +742,7 @@ dependencies = [
name = "ethcore-secretstore"
version = "1.0.0"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethabi 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore 1.8.0",
"ethcore-bigint 0.1.3",
@ -855,7 +855,7 @@ dependencies = [
name = "ethkey"
version = "0.2.0"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"eth-secp256k1 0.5.6 (git+https://github.com/paritytech/rust-secp256k1)",
"ethcore-bigint 0.1.3",
"lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -948,7 +948,7 @@ name = "evm"
version = "0.1.0"
dependencies = [
"bit-set 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"common-types 0.1.0",
"ethcore-bigint 0.1.3",
"ethcore-logger 1.8.0",
@ -1421,7 +1421,7 @@ name = "libflate"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1634,7 +1634,7 @@ dependencies = [
name = "native-contracts"
version = "0.1.0"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethabi 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-bigint 0.1.3",
"futures 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2062,6 +2062,7 @@ dependencies = [
"ethcore-ipc 1.8.0",
"ethcore-light 1.8.0",
"ethcore-logger 1.8.0",
"ethcore-network 1.8.0",
"ethcore-util 1.8.0",
"ethcrypto 0.1.0",
"ethjson 0.1.0",
@ -2190,7 +2191,7 @@ name = "parity-wasm"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -2200,7 +2201,7 @@ name = "parity-whisper"
version = "0.1.0"
dependencies = [
"bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-bigint 0.1.3",
"ethcore-network 1.8.0",
"ethcrypto 0.1.0",
@ -2504,7 +2505,7 @@ dependencies = [
name = "rlp"
version = "0.2.0"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"elastic-array 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ethcore-bigint 0.1.3",
"lazy_static 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -3257,7 +3258,7 @@ dependencies = [
name = "vm"
version = "0.1.0"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"common-types 0.1.0",
"ethcore-bigint 0.1.3",
"ethcore-util 1.8.0",
@ -3301,7 +3302,7 @@ name = "ws"
version = "0.7.1"
source = "git+https://github.com/tomusdrw/ws-rs#f8306a798b7541d64624299a83a2c934f173beed"
dependencies = [
"byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"httparse 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
@ -3380,7 +3381,7 @@ dependencies = [
"checksum bloomchain 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3f421095d2a76fc24cd3fb3f912b90df06be7689912b1bdb423caefae59c258d"
"checksum bn 0.4.4 (git+https://github.com/paritytech/bn)" = "<none>"
"checksum byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0fc10e8cc6b2580fda3f36eb6dc5316657f812a3df879a44a66fc9f0fdbc4855"
"checksum byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c40977b0ee6b9885c9013cd41d9feffdd22deb3bb4dc3a71d901cc7a77de18c8"
"checksum byteorder 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ff81738b726f5d099632ceaffe7fb65b90212e8dce59d518729e7e8634032d3d"
"checksum bytes 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8b24f16593f445422331a5eed46b72f7f171f910fead4f2ea8f17e727e9c5c14"
"checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c"
"checksum cid 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "34aa7da06f10541fbca6850719cdaa8fa03060a5d2fb33840f149cf8133a00c7"

View File

@ -354,14 +354,14 @@ impl FullDependencies {
},
Api::Whisper => {
if let Some(ref whisper_rpc) = self.whisper_rpc {
let whisper = whisper_rpc.make_handler();
let whisper = whisper_rpc.make_handler(self.net.clone());
handler.extend_with(::parity_whisper::rpc::Whisper::to_delegate(whisper));
}
}
Api::WhisperPubSub => {
if !for_generic_pubsub {
if let Some(ref whisper_rpc) = self.whisper_rpc {
let whisper = whisper_rpc.make_handler();
let whisper = whisper_rpc.make_handler(self.net.clone());
handler.extend_with(
::parity_whisper::rpc::WhisperPubSub::to_delegate(whisper)
);
@ -554,13 +554,13 @@ impl<C: LightChainClient + 'static> LightDependencies<C> {
},
Api::Whisper => {
if let Some(ref whisper_rpc) = self.whisper_rpc {
let whisper = whisper_rpc.make_handler();
let whisper = whisper_rpc.make_handler(self.net.clone());
handler.extend_with(::parity_whisper::rpc::Whisper::to_delegate(whisper));
}
}
Api::WhisperPubSub => {
if let Some(ref whisper_rpc) = self.whisper_rpc {
let whisper = whisper_rpc.make_handler();
let whisper = whisper_rpc.make_handler(self.net.clone());
handler.extend_with(::parity_whisper::rpc::WhisperPubSub::to_delegate(whisper));
}
}

View File

@ -246,10 +246,8 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
let mut attached_protos = Vec::new();
let whisper_factory = if cmd.whisper.enabled {
let (whisper_net, whisper_factory) = ::whisper::setup(cmd.whisper.target_message_pool_size)
let whisper_factory = ::whisper::setup(cmd.whisper.target_message_pool_size, &mut attached_protos)
.map_err(|e| format!("Failed to initialize whisper: {}", e))?;
attached_protos.push(whisper_net);
whisper_factory
} else {
None
@ -638,10 +636,9 @@ pub fn execute(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) -> R
let mut attached_protos = Vec::new();
let whisper_factory = if cmd.whisper.enabled {
let (whisper_net, whisper_factory) = ::whisper::setup(cmd.whisper.target_message_pool_size)
let whisper_factory = ::whisper::setup(cmd.whisper.target_message_pool_size, &mut attached_protos)
.map_err(|e| format!("Failed to initialize whisper: {}", e))?;
attached_protos.push(whisper_net);
whisper_factory
} else {
None

View File

@ -17,10 +17,11 @@
use std::sync::Arc;
use std::io;
use ethsync::AttachedProtocol;
use ethsync::{AttachedProtocol, ManageNetwork};
use parity_rpc::Metadata;
use parity_whisper::net::{self as whisper_net, PoolHandle, Network as WhisperNetwork};
use parity_whisper::rpc::{WhisperClient, FilterManager};
use parity_whisper::message::Message;
use parity_whisper::net::{self as whisper_net, Network as WhisperNetwork};
use parity_whisper::rpc::{WhisperClient, PoolHandle, FilterManager};
/// Whisper config.
#[derive(Debug, PartialEq, Eq)]
@ -38,6 +39,31 @@ impl Default for Config {
}
}
/// Standard pool handle.
pub struct NetPoolHandle {
/// Pool handle.
handle: Arc<WhisperNetwork<Arc<FilterManager>>>,
/// Network manager.
net: Arc<ManageNetwork>,
}
impl PoolHandle for NetPoolHandle {
fn relay(&self, message: Message) -> bool {
let mut res = false;
let mut message = Some(message);
self.net.with_proto_context(whisper_net::PROTOCOL_ID, &mut move |ctx| {
if let Some(message) = message.take() {
res = self.handle.post_message(message, ctx);
}
});
res
}
fn pool_status(&self) -> whisper_net::PoolStatus {
self.handle.pool_status()
}
}
/// Factory for standard whisper RPC.
pub struct RpcFactory {
net: Arc<WhisperNetwork<Arc<FilterManager>>>,
@ -45,8 +71,9 @@ pub struct RpcFactory {
}
impl RpcFactory {
pub fn make_handler(&self) -> WhisperClient<PoolHandle, Metadata> {
WhisperClient::new(self.net.handle(), self.manager.clone())
pub fn make_handler(&self, net: Arc<ManageNetwork>) -> WhisperClient<NetPoolHandle, Metadata> {
let handle = NetPoolHandle { handle: self.net.clone(), net: net };
WhisperClient::new(handle, self.manager.clone())
}
}
@ -54,24 +81,36 @@ impl RpcFactory {
///
/// Will target the given pool size.
#[cfg(not(feature = "ipc"))]
pub fn setup(target_pool_size: usize) -> io::Result<(AttachedProtocol, Option<RpcFactory>)> {
pub fn setup(target_pool_size: usize, protos: &mut Vec<AttachedProtocol>)
-> io::Result<Option<RpcFactory>>
{
let manager = Arc::new(FilterManager::new()?);
let net = Arc::new(WhisperNetwork::new(target_pool_size, manager.clone()));
let proto = AttachedProtocol {
protos.push(AttachedProtocol {
handler: net.clone() as Arc<_>,
packet_count: whisper_net::PACKET_COUNT,
versions: whisper_net::SUPPORTED_VERSIONS,
protocol_id: *b"shh",
};
protocol_id: whisper_net::PROTOCOL_ID,
});
// parity-only extensions to whisper.
protos.push(AttachedProtocol {
handler: Arc::new(whisper_net::ParityExtensions),
packet_count: whisper_net::PACKET_COUNT,
versions: whisper_net::SUPPORTED_VERSIONS,
protocol_id: whisper_net::PARITY_PROTOCOL_ID,
});
let factory = RpcFactory { net: net, manager: manager };
Ok((proto, Some(factory)))
Ok(Some(factory))
}
// TODO: make it possible to attach generic protocols in IPC.
#[cfg(feature = "ipc")]
pub fn setup(_pool: usize) -> (AttachedProtocol, Option<RpcFactory>) {
Ok((AttachedProtocol, None))
pub fn setup(_target_pool_size: usize, _protos: &mut Vec<AttachedProtocol>)
-> io::Result<Option<RpcFactory>>
{
Ok(None)
}

View File

@ -62,5 +62,8 @@ hash = { path = "../util/hash" }
clippy = { version = "0.0.103", optional = true}
pretty_assertions = "0.1"
[dev-dependencies]
ethcore-network = { path = "../util/network" }
[features]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev"]

View File

@ -15,6 +15,9 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
use ethsync::{ManageNetwork, NetworkConfiguration};
use self::ethcore_network::{ProtocolId, NetworkContext};
extern crate ethcore_network;
pub struct TestManageNetwork;
@ -27,4 +30,5 @@ impl ManageNetwork for TestManageNetwork {
fn start_network(&self) {}
fn stop_network(&self) {}
fn network_config(&self) -> NetworkConfiguration { NetworkConfiguration::new_local() }
fn with_proto_context(&self, _: ProtocolId, _: &mut FnMut(&NetworkContext)) { }
}

View File

@ -497,6 +497,8 @@ pub trait ManageNetwork : Send + Sync {
fn stop_network(&self);
/// Query the current configuration of the network
fn network_config(&self) -> NetworkConfiguration;
/// Get network context for protocol.
fn with_proto_context(&self, proto: ProtocolId, f: &mut FnMut(&NetworkContext));
}
@ -538,6 +540,10 @@ impl ManageNetwork for EthSync {
fn network_config(&self) -> NetworkConfiguration {
NetworkConfiguration::from(self.network.config().clone())
}
fn with_proto_context(&self, proto: ProtocolId, f: &mut FnMut(&NetworkContext)) {
self.network.with_context_eval(proto, f);
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
@ -808,6 +814,10 @@ impl ManageNetwork for LightSync {
fn network_config(&self) -> NetworkConfiguration {
NetworkConfiguration::from(self.network.config().clone())
}
fn with_proto_context(&self, proto: ProtocolId, f: &mut FnMut(&NetworkContext)) {
self.network.with_context_eval(proto, f);
}
}
impl LightSyncProvider for LightSync {

View File

@ -56,23 +56,18 @@ impl Topic {
/// this takes 3 sets of 9 bits, treating each as an index in the range
/// 0..512 into the bloom and setting the corresponding bit in the bloom to 1.
pub fn bloom_into(&self, bloom: &mut H512) {
let mut set_bit = |idx: usize| {
let idx = idx & 511;
bloom[idx / 8] |= 1 << idx % 8;
};
let data = &self.0;
let mut combined = ((data[0] as usize) << 24) |
((data[1] as usize) << 16) |
((data[2] as usize) << 8) |
data[3] as usize;
for i in 0..3 {
let mut idx = data[i] as usize;
// take off the last 5 bits as we only use 27.
combined >>= 5;
if data[3] & (1 << i) != 0 {
idx += 256;
}
set_bit(combined);
set_bit(combined >> 9);
set_bit(combined >> 18);
debug_assert!(idx <= 511);
bloom[idx / 8] |= 1 << (7 - idx % 8);
}
}
/// Get bloom for single topic.
@ -118,6 +113,7 @@ pub fn bloom_topics(topics: &[Topic]) -> H512 {
#[derive(Debug)]
pub enum Error {
Decoder(DecoderError),
EmptyTopics,
LivesTooLong,
IssuedInFuture,
ZeroTTL,
@ -136,10 +132,27 @@ impl fmt::Display for Error {
Error::LivesTooLong => write!(f, "Message claims to be issued before the unix epoch."),
Error::IssuedInFuture => write!(f, "Message issued in future."),
Error::ZeroTTL => write!(f, "Message live for zero time."),
Error::EmptyTopics => write!(f, "Message has no topics."),
}
}
}
fn append_topics<'a>(s: &'a mut RlpStream, topics: &[Topic]) -> &'a mut RlpStream {
if topics.len() == 1 {
s.append(&topics[0])
} else {
s.append_list(&topics)
}
}
fn decode_topics(rlp: UntrustedRlp) -> Result<SmallVec<[Topic; 4]>, DecoderError> {
if rlp.is_list() {
rlp.iter().map(|r| r.as_val::<Topic>()).collect()
} else {
rlp.as_val().map(|t| SmallVec::from_slice(&[t]))
}
}
// Raw envelope struct.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Envelope {
@ -156,15 +169,20 @@ pub struct Envelope {
}
impl Envelope {
/// Whether the message is multi-topic. Only relay these to Parity peers.
pub fn is_multitopic(&self) -> bool {
self.topics.len() != 1
}
fn proving_hash(&self) -> H256 {
use byteorder::{BigEndian, ByteOrder};
let mut buf = [0; 32];
let mut stream = RlpStream::new_list(4);
stream.append(&self.expiry)
.append(&self.ttl)
.append_list(&self.topics)
stream.append(&self.expiry).append(&self.ttl);
append_topics(&mut stream, &self.topics)
.append(&self.data);
let mut digest = Keccak::new_keccak256();
@ -185,8 +203,9 @@ impl rlp::Encodable for Envelope {
fn rlp_append(&self, s: &mut RlpStream) {
s.begin_list(5)
.append(&self.expiry)
.append(&self.ttl)
.append_list(&self.topics)
.append(&self.ttl);
append_topics(s, &self.topics)
.append(&self.data)
.append(&self.nonce);
}
@ -199,13 +218,17 @@ impl rlp::Decodable for Envelope {
Ok(Envelope {
expiry: rlp.val_at(0)?,
ttl: rlp.val_at(1)?,
topics: rlp.at(2)?.iter().map(|x| x.as_val()).collect::<Result<_, _>>()?,
topics: decode_topics(rlp.at(2)?)?,
data: rlp.val_at(3)?,
nonce: rlp.val_at(4)?,
})
}
}
/// Error indicating no topics.
#[derive(Debug, Copy, Clone)]
pub struct EmptyTopics;
/// Message creation parameters.
/// Pass this to `Message::create` to make a message.
pub struct CreateParams {
@ -213,7 +236,7 @@ pub struct CreateParams {
pub ttl: u64,
/// payload data.
pub payload: Vec<u8>,
/// Topics.
/// Topics. May not be empty.
pub topics: Vec<Topic>,
/// How many milliseconds to spend proving work.
pub work: u64,
@ -231,10 +254,12 @@ pub struct Message {
impl Message {
/// Create a message from creation parameters.
/// Panics if TTL is 0.
pub fn create(params: CreateParams) -> Self {
pub fn create(params: CreateParams) -> Result<Self, EmptyTopics> {
use byteorder::{BigEndian, ByteOrder};
use rand::{Rng, SeedableRng, XorShiftRng};
if params.topics.is_empty() { return Err(EmptyTopics) }
let mut rng = {
let mut thread_rng = ::rand::thread_rng();
@ -254,10 +279,8 @@ impl Message {
let start_digest = {
let mut stream = RlpStream::new_list(4);
stream.append(&expiry)
.append(&params.ttl)
.append_list(&params.topics)
.append(&params.payload);
stream.append(&expiry).append(&params.ttl);
append_topics(&mut stream, &params.topics).append(&params.payload);
let mut digest = Keccak::new_keccak256();
digest.update(&*stream.drain());
@ -300,12 +323,12 @@ impl Message {
let encoded = ::rlp::encode(&envelope);
Message::from_components(
Ok(Message::from_components(
envelope,
encoded.len(),
H256(keccak256(&encoded)),
SystemTime::now(),
).expect("Message generated here known to be valid; qed")
).expect("Message generated here known to be valid; qed"))
}
/// Decode message from RLP and check for validity against system time.
@ -327,6 +350,8 @@ impl Message {
if envelope.expiry <= envelope.ttl { return Err(Error::LivesTooLong) }
if envelope.ttl == 0 { return Err(Error::ZeroTTL) }
if envelope.topics.is_empty() { return Err(Error::EmptyTopics) }
let issue_time_adjusted = Duration::from_secs(
(envelope.expiry - envelope.ttl).saturating_sub(LEEWAY_SECONDS)
);
@ -394,6 +419,7 @@ mod tests {
use super::*;
use std::time::{self, Duration, SystemTime};
use rlp::UntrustedRlp;
use smallvec::SmallVec;
fn unix_time(x: u64) -> SystemTime {
time::UNIX_EPOCH + Duration::from_secs(x)
@ -401,12 +427,12 @@ mod tests {
#[test]
fn create_message() {
let _ = Message::create(CreateParams {
assert!(Message::create(CreateParams {
ttl: 100,
payload: vec![1, 2, 3, 4],
topics: Vec::new(),
topics: vec![Topic([1, 2, 1, 2])],
work: 50,
});
}).is_ok());
}
#[test]
@ -415,7 +441,23 @@ mod tests {
expiry: 100_000,
ttl: 30,
data: vec![9; 256],
topics: Default::default(),
topics: SmallVec::from_slice(&[Default::default()]),
nonce: 1010101,
};
let encoded = ::rlp::encode(&envelope);
let decoded = ::rlp::decode(&encoded);
assert_eq!(envelope, decoded)
}
#[test]
fn round_trip_multitopic() {
let envelope = Envelope {
expiry: 100_000,
ttl: 30,
data: vec![9; 256],
topics: SmallVec::from_slice(&[Default::default(), Topic([1, 2, 3, 4])]),
nonce: 1010101,
};
@ -431,7 +473,7 @@ mod tests {
expiry: 100_000,
ttl: 30,
data: vec![9; 256],
topics: Default::default(),
topics: SmallVec::from_slice(&[Default::default()]),
nonce: 1010101,
};
@ -450,7 +492,7 @@ mod tests {
expiry: 100_000,
ttl: 30,
data: vec![9; 256],
topics: Default::default(),
topics: SmallVec::from_slice(&[Default::default()]),
nonce: 1010101,
};
@ -467,7 +509,7 @@ mod tests {
expiry: 100_000,
ttl: 200_000,
data: vec![9; 256],
topics: Default::default(),
topics: SmallVec::from_slice(&[Default::default()]),
nonce: 1010101,
};

View File

@ -23,31 +23,45 @@ use std::time::{Duration, SystemTime};
use std::sync::Arc;
use bigint::hash::{H256, H512};
use network::{HostInfo, NetworkContext, NetworkError, NodeId, PeerId, TimerToken};
use network::{HostInfo, NetworkContext, NetworkError, NodeId, PeerId, ProtocolId, TimerToken};
use ordered_float::OrderedFloat;
use parking_lot::{Mutex, RwLock};
use rlp::{DecoderError, RlpStream, UntrustedRlp};
use message::{Message, Error as MessageError};
// how often periodic relays are. when messages are imported
// we directly broadcast.
const RALLY_TOKEN: TimerToken = 1;
const RALLY_TIMEOUT_MS: u64 = 750; // supposed to be at least once per second.
const RALLY_TIMEOUT_MS: u64 = 2500;
const PROTOCOL_VERSION: usize = 2;
/// Current protocol version.
pub const PROTOCOL_VERSION: usize = 6;
/// Supported protocol versions.
pub const SUPPORTED_VERSIONS: &'static [u8] = &[PROTOCOL_VERSION as u8];
// maximum tolerated delay between messages packets.
const MAX_TOLERATED_DELAY_MS: u64 = 2000;
const MAX_TOLERATED_DELAY_MS: u64 = 5000;
/// Number of packets.
pub const PACKET_COUNT: u8 = 3;
/// Number of packets. A bunch are reserved.
pub const PACKET_COUNT: u8 = 128;
/// Whisper protocol ID
pub const PROTOCOL_ID: ::network::ProtocolId = *b"shh";
/// Parity-whisper protocol ID
/// Current parity-specific extensions:
/// - Multiple topics in packet.
pub const PARITY_PROTOCOL_ID: ::network::ProtocolId = *b"pwh";
mod packet {
pub const STATUS: u8 = 0;
pub const MESSAGES: u8 = 1;
pub const TOPIC_FILTER: u8 = 2;
pub const POW_REQUIREMENT: u8 = 2;
pub const TOPIC_FILTER: u8 = 3;
// 126, 127 for mail server stuff we will never implement here.
}
/// Handles messages within a single packet.
@ -67,11 +81,9 @@ enum Error {
Decoder(DecoderError),
Network(NetworkError),
Message(MessageError),
UnknownPacket(u8),
UnknownPeer(PeerId),
ProtocolVersionMismatch(usize),
SameNodeKey,
UnexpectedMessage,
InvalidPowReq,
}
impl From<DecoderError> for Error {
@ -98,12 +110,9 @@ impl fmt::Display for Error {
Error::Decoder(ref err) => write!(f, "Failed to decode packet: {}", err),
Error::Network(ref err) => write!(f, "Network error: {}", err),
Error::Message(ref err) => write!(f, "Error decoding message: {}", err),
Error::UnknownPacket(ref id) => write!(f, "Unknown packet kind: {}", id),
Error::UnknownPeer(ref id) => write!(f, "Message received from unknown peer: {}", id),
Error::ProtocolVersionMismatch(ref proto) =>
write!(f, "Unknown protocol version: {}", proto),
Error::UnexpectedMessage => write!(f, "Unexpected message."),
Error::SameNodeKey => write!(f, "Peer and us have same node key."),
Error::InvalidPowReq => write!(f, "Peer sent invalid PoW requirement."),
}
}
}
@ -298,15 +307,18 @@ impl Messages {
enum State {
Unconfirmed(SystemTime), // awaiting status packet.
TheirTurn(SystemTime), // it has been their turn to send since stored time.
OurTurn,
Confirmed,
}
#[allow(dead_code)] // for node key. this will be useful for topic routing.
struct Peer {
node_key: NodeId,
state: State,
known_messages: HashSet<H256>,
topic_filter: Option<H512>,
pow_requirement: f64,
is_parity: bool,
_protocol_version: usize,
}
impl Peer {
@ -319,12 +331,14 @@ impl Peer {
// whether this peer will accept the message.
fn will_accept(&self, message: &Message) -> bool {
let known = self.known_messages.contains(message.hash());
if self.known_messages.contains(message.hash()) { return false }
let matches_bloom = self.topic_filter.as_ref()
.map_or(true, |topic| topic & message.bloom() == message.bloom().clone());
// only parity peers will accept multitopic messages.
if message.envelope().is_multitopic() && !self.is_parity { return false }
if message.work_proved() < self.pow_requirement { return false }
!known && matches_bloom
self.topic_filter.as_ref()
.map_or(true, |filter| &(filter & message.bloom()) == message.bloom())
}
// note a message as known. returns true if it was already
@ -337,10 +351,14 @@ impl Peer {
self.topic_filter = Some(topic);
}
fn set_pow_requirement(&mut self, pow_requirement: f64) {
self.pow_requirement = pow_requirement;
}
fn can_send_messages(&self) -> bool {
match self.state {
State::Unconfirmed(_) | State::OurTurn => false,
State::TheirTurn(_) => true,
State::Unconfirmed(_) => false,
State::Confirmed => true,
}
}
}
@ -357,21 +375,41 @@ pub struct PoolStatus {
pub target_size: usize,
}
/// Handle to the pool, for posting messages or getting info.
#[derive(Clone)]
pub struct PoolHandle {
messages: Arc<RwLock<Messages>>,
/// Generic network context.
pub trait Context {
/// Disconnect a peer.
fn disconnect_peer(&self, PeerId);
/// Disable a peer.
fn disable_peer(&self, PeerId);
/// Get a peer's node key.
fn node_key(&self, PeerId) -> Option<NodeId>;
/// Get a peer's protocol version for given protocol.
fn protocol_version(&self, ProtocolId, PeerId) -> Option<u8>;
/// Send message to peer.
fn send(&self, PeerId, u8, Vec<u8>);
}
impl PoolHandle {
/// Post a message to the whisper network to be relayed.
pub fn post_message(&self, message: Message) -> bool {
self.messages.write().insert(message)
impl<'a> Context for NetworkContext<'a> {
fn disconnect_peer(&self, peer: PeerId) {
NetworkContext::disconnect_peer(self, peer);
}
fn disable_peer(&self, peer: PeerId) {
NetworkContext::disable_peer(self, peer)
}
fn node_key(&self, peer: PeerId) -> Option<NodeId> {
self.session_info(peer).and_then(|info| info.id)
}
fn protocol_version(&self, proto_id: ProtocolId, peer: PeerId) -> Option<u8> {
NetworkContext::protocol_version(self, proto_id, peer)
}
/// Get number of messages and amount of memory used by them.
pub fn pool_status(&self) -> PoolStatus {
self.messages.read().status()
fn send(&self, peer: PeerId, packet_id: u8, message: Vec<u8>) {
if let Err(e) = NetworkContext::send(self, peer, packet_id, message) {
debug!(target: "whisper", "Failed to send packet {} to peer {}: {}",
packet_id, peer, e);
self.disconnect_peer(peer)
}
}
}
@ -395,15 +433,23 @@ impl<T> Network<T> {
}
}
/// Acquire a sender to asynchronously feed messages to the whisper
/// network.
pub fn handle(&self) -> PoolHandle {
PoolHandle { messages: self.messages.clone() }
/// Post a message to the whisper network to be relayed.
pub fn post_message<C: Context>(&self, message: Message, context: &C) -> bool
where T: MessageHandler
{
let ok = self.messages.write().insert(message);
if ok { self.rally(context) }
ok
}
/// Get number of messages and amount of memory used by them.
pub fn pool_status(&self) -> PoolStatus {
self.messages.read().status()
}
}
impl<T: MessageHandler> Network<T> {
fn rally(&self, io: &NetworkContext) {
fn rally<C: Context>(&self, io: &C) {
// cannot be greater than 16MB (protocol limitation)
const MAX_MESSAGES_PACKET_SIZE: usize = 8 * 1024 * 1024;
@ -428,11 +474,11 @@ impl<T: MessageHandler> Network<T> {
// check timeouts and skip peers who we can't send a rally to.
match peer_data.state {
State::Unconfirmed(ref time) | State::TheirTurn(ref time) => {
State::Unconfirmed(ref time) => {
punish_timeout(time);
continue;
}
State::OurTurn => {}
State::Confirmed => {}
}
// construct packet, skipping messages the peer won't accept.
@ -452,39 +498,19 @@ impl<T: MessageHandler> Network<T> {
stream.complete_unbounded_list();
peer_data.state = State::TheirTurn(SystemTime::now());
if let Err(e) = io.send(*peer_id, packet::MESSAGES, stream.out()) {
debug!(target: "whisper", "Failed to send messages packet to peer {}: {}", peer_id, e);
io.disconnect_peer(*peer_id);
}
io.send(*peer_id, packet::MESSAGES, stream.out());
}
}
// handle status packet from peer.
fn on_status(&self, peer: &PeerId, status: UntrustedRlp)
fn on_status(&self, peer: &PeerId, _status: UntrustedRlp)
-> Result<(), Error>
{
let proto: usize = status.as_val()?;
if proto != PROTOCOL_VERSION { return Err(Error::ProtocolVersionMismatch(proto)) }
let peers = self.peers.read();
match peers.get(peer) {
Some(peer) => {
let mut peer = peer.lock();
let our_node_key = self.node_key.read().clone();
// handle this basically impossible edge case gracefully.
if peer.node_key == our_node_key {
return Err(Error::SameNodeKey);
}
// peer with lower node key begins the rally.
if peer.node_key > our_node_key {
peer.state = State::OurTurn;
} else {
peer.state = State::TheirTurn(SystemTime::now());
}
peer.lock().state = State::Confirmed;
Ok(())
}
None => {
@ -513,8 +539,6 @@ impl<T: MessageHandler> Network<T> {
return Err(Error::UnexpectedMessage);
}
peer.state = State::OurTurn;
let now = SystemTime::now();
let mut messages_vec = message_packet.iter().map(|rlp| Message::decode(rlp, now))
.collect::<Result<Vec<_>, _>>()?;
@ -541,6 +565,42 @@ impl<T: MessageHandler> Network<T> {
Ok(())
}
fn on_pow_requirement(&self, peer: &PeerId, requirement: UntrustedRlp)
-> Result<(), Error>
{
use byteorder::{ByteOrder, BigEndian};
let peers = self.peers.read();
match peers.get(peer) {
Some(peer) => {
let mut peer = peer.lock();
if let State::Unconfirmed(_) = peer.state {
return Err(Error::UnexpectedMessage);
}
let bytes: Vec<u8> = requirement.as_val()?;
if bytes.len() != ::std::mem::size_of::<f64>() {
return Err(Error::InvalidPowReq);
}
// as of byteorder 1.1.0, this is always defined.
let req = BigEndian::read_f64(&bytes[..]);
if !req.is_normal() {
return Err(Error::InvalidPowReq);
}
peer.set_pow_requirement(req);
}
None => {
debug!(target: "whisper", "Received message from unknown peer.");
return Err(Error::UnknownPeer(*peer));
}
}
Ok(())
}
fn on_topic_filter(&self, peer: &PeerId, filter: UntrustedRlp)
-> Result<(), Error>
{
@ -564,10 +624,10 @@ impl<T: MessageHandler> Network<T> {
Ok(())
}
fn on_connect(&self, io: &NetworkContext, peer: &PeerId) {
fn on_connect<C: Context>(&self, io: &C, peer: &PeerId) {
trace!(target: "whisper", "Connecting peer {}", peer);
let node_key = match io.session_info(*peer).and_then(|info| info.id) {
let node_key = match io.node_key(*peer) {
Some(node_key) => node_key,
None => {
debug!(target: "whisper", "Disconnecting peer {}, who has no node key.", peer);
@ -576,17 +636,25 @@ impl<T: MessageHandler> Network<T> {
}
};
let version = match io.protocol_version(PROTOCOL_ID, *peer) {
Some(version) => version as usize,
None => {
io.disable_peer(*peer);
return
}
};
self.peers.write().insert(*peer, Mutex::new(Peer {
node_key: node_key,
state: State::Unconfirmed(SystemTime::now()),
known_messages: HashSet::new(),
topic_filter: None,
pow_requirement: 0f64,
is_parity: io.protocol_version(PARITY_PROTOCOL_ID, *peer).is_some(),
_protocol_version: version,
}));
if let Err(e) = io.send(*peer, packet::STATUS, ::rlp::encode(&PROTOCOL_VERSION).to_vec()) {
debug!(target: "whisper", "Error sending status: {}", e);
io.disconnect_peer(*peer);
}
io.send(*peer, packet::STATUS, ::rlp::EMPTY_LIST_RLP.to_vec());
}
fn on_disconnect(&self, peer: &PeerId) {
@ -609,8 +677,9 @@ impl<T: MessageHandler> ::network::NetworkProtocolHandler for Network<T> {
let res = match packet_id {
packet::STATUS => self.on_status(peer, rlp),
packet::MESSAGES => self.on_messages(peer, rlp),
packet::POW_REQUIREMENT => self.on_pow_requirement(peer, rlp),
packet::TOPIC_FILTER => self.on_topic_filter(peer, rlp),
other => Err(Error::UnknownPacket(other)),
_ => Ok(()), // ignore unknown packets.
};
if let Err(e) = res {
@ -636,3 +705,19 @@ impl<T: MessageHandler> ::network::NetworkProtocolHandler for Network<T> {
}
}
}
/// Dummy subprotocol used for parity extensions.
#[derive(Debug, Copy, Clone)]
pub struct ParityExtensions;
impl ::network::NetworkProtocolHandler for ParityExtensions {
fn initialize(&self, _io: &NetworkContext, _host_info: &HostInfo) { }
fn read(&self, _io: &NetworkContext, _peer: &PeerId, _id: u8, _msg: &[u8]) { }
fn connected(&self, _io: &NetworkContext, _peer: &PeerId) { }
fn disconnected(&self, _io: &NetworkContext, _peer: &PeerId) { }
fn timeout(&self, _io: &NetworkContext, _timer: TimerToken) { }
}

View File

@ -307,7 +307,7 @@ impl Filter {
#[cfg(test)]
mod tests {
use message::{CreateParams, Message};
use message::{CreateParams, Message, Topic};
use rpc::types::{FilterRequest, HexEncode};
use rpc::abridge_topic;
use super::*;
@ -325,38 +325,40 @@ mod tests {
#[test]
fn basic_match() {
let topics = vec![vec![1, 2, 3], vec![4, 5, 6]];
let topics = vec![vec![1, 2, 3, 4], vec![5, 6, 7, 8]];
let abridged_topics: Vec<_> = topics.iter().map(|x| abridge_topic(&x)).collect();
let req = FilterRequest {
decrypt_with: Default::default(),
from: None,
topics: topics.iter().cloned().map(HexEncode).collect(),
topics: topics.into_iter().map(HexEncode).collect(),
};
let filter = Filter::new(req).unwrap();
let message = Message::create(CreateParams {
ttl: 100,
payload: vec![1, 3, 5, 7, 9],
topics: topics.iter().map(|x| abridge_topic(&x)).collect(),
topics: abridged_topics.clone(),
work: 0,
});
}).unwrap();
assert!(filter.basic_matches(&message));
let message = Message::create(CreateParams {
ttl: 100,
payload: vec![1, 3, 5, 7, 9],
topics: topics.iter().take(1).map(|x| abridge_topic(&x)).collect(),
topics: abridged_topics.clone(),
work: 0,
});
}).unwrap();
assert!(filter.basic_matches(&message));
let message = Message::create(CreateParams {
ttl: 100,
payload: vec![1, 3, 5, 7, 9],
topics: Vec::new(),
topics: vec![Topic([1, 8, 3, 99])],
work: 0,
});
}).unwrap();
assert!(!filter.basic_matches(&message));
}
@ -366,6 +368,9 @@ mod tests {
use rpc::payload::{self, EncodeParams};
use rpc::key_store::{Key, KeyStore};
let topics = vec![vec![1, 2, 3, 4], vec![5, 6, 7, 8]];
let abridged_topics: Vec<_> = topics.iter().map(|x| abridge_topic(&x)).collect();
let mut store = KeyStore::new().unwrap();
let signing_pair = Key::new_asymmetric(store.rng());
let encrypting_key = Key::new_symmetric(store.rng());
@ -386,24 +391,25 @@ mod tests {
let message = Message::create(CreateParams {
ttl: 100,
payload: encrypted,
topics: vec![abridge_topic(&[9; 32])],
topics: abridged_topics.clone(),
work: 0,
});
}).unwrap();
let message2 = Message::create(CreateParams {
ttl: 100,
payload: vec![3, 5, 7, 9],
topics: vec![abridge_topic(&[9; 32])],
topics: abridged_topics,
work: 0,
});
}).unwrap();
let filter = Filter::new(FilterRequest {
decrypt_with: Some(HexEncode(decrypt_id)),
from: Some(HexEncode(signing_pair.public().unwrap().clone())),
topics: vec![HexEncode(vec![9; 32])],
topics: topics.into_iter().map(HexEncode).collect(),
}).unwrap();
assert!(filter.basic_matches(&message));
assert!(filter.basic_matches(&message2));
let items = ::std::cell::Cell::new(0);
let on_match = |_| { items.set(items.get() + 1); };

View File

@ -155,16 +155,6 @@ pub trait PoolHandle: Send + Sync {
fn pool_status(&self) -> ::net::PoolStatus;
}
impl PoolHandle for ::net::PoolHandle {
fn relay(&self, message: Message) -> bool {
self.post_message(message)
}
fn pool_status(&self) -> ::net::PoolStatus {
::net::PoolHandle::pool_status(self)
}
}
/// Default, simple metadata implementation.
#[derive(Clone, Default)]
pub struct Meta {
@ -339,7 +329,7 @@ impl<P: PoolHandle + 'static, M: Send + Sync + 'static> Whisper for WhisperClien
payload: encrypted,
topics: req.topics.into_iter().map(|x| abridge_topic(&x.into_inner())).collect(),
work: req.priority,
});
}).map_err(|_| whisper_error("Empty topics"))?;
if !self.pool.relay(message) {
Err(whisper_error("PoW too low to compete with other messages"))

View File

@ -221,7 +221,7 @@ pub struct FilterItem {
/// Time to live in seconds.
pub ttl: u64,
/// Abridged topics that matched the filter.
/// Topics that matched the filter.
pub topics: Vec<Bytes>,
/// Unix timestamp of the message generation.