Merge branch 'master' into ui-2
This commit is contained in:
commit
e0927f099e
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -1783,7 +1783,7 @@ dependencies = [
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "parity-ui-precompiled"
|
name = "parity-ui-precompiled"
|
||||||
version = "1.4.0"
|
version = "1.4.0"
|
||||||
source = "git+https://github.com/paritytech/js-precompiled.git#3dd953a83569af644c5737a22c0ceb7d5f68b138"
|
source = "git+https://github.com/paritytech/js-precompiled.git#bc28cda725b0bfe655b807611091389e9fcdaac8"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"parity-dapps-glue 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
"parity-dapps-glue 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
]
|
]
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "parity.js",
|
"name": "parity.js",
|
||||||
"version": "1.7.77",
|
"version": "1.7.78",
|
||||||
"main": "release/index.js",
|
"main": "release/index.js",
|
||||||
"jsnext:main": "src/index.js",
|
"jsnext:main": "src/index.js",
|
||||||
"author": "Parity Team <admin@parity.io>",
|
"author": "Parity Team <admin@parity.io>",
|
||||||
|
@ -195,11 +195,6 @@ export default class Ws extends JsonRpcBase {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_onMessage = (event) => {
|
_onMessage = (event) => {
|
||||||
// Event sent by Signer Broadcaster
|
|
||||||
if (event.data === 'new_message') {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const result = JSON.parse(event.data);
|
const result = JSON.parse(event.data);
|
||||||
const { method, params, json, resolve, reject } = this._messages[result.id];
|
const { method, params, json, resolve, reject } = this._messages[result.id];
|
||||||
|
@ -552,7 +552,7 @@ The following options are possible for the \`defaultBlock\` parameter:
|
|||||||
blockNumber: fromDecimal(5599),
|
blockNumber: fromDecimal(5599),
|
||||||
transactionIndex: fromDecimal(1),
|
transactionIndex: fromDecimal(1),
|
||||||
from: '0x407d73d8a49eeb85d32cf465507dd71d507100c1',
|
from: '0x407d73d8a49eeb85d32cf465507dd71d507100c1',
|
||||||
to: '0x85h43d8a49eeb85d32cf465507dd71d507100c1',
|
to: '0x853f43d8a49eeb85d32cf465507dd71d507100c1',
|
||||||
value: fromDecimal(520464),
|
value: fromDecimal(520464),
|
||||||
gas: fromDecimal(520464),
|
gas: fromDecimal(520464),
|
||||||
gasPrice: '0x09184e72a000',
|
gasPrice: '0x09184e72a000',
|
||||||
@ -969,24 +969,24 @@ The following options are possible for the \`defaultBlock\` parameter:
|
|||||||
},
|
},
|
||||||
|
|
||||||
sign: {
|
sign: {
|
||||||
desc: 'Signs transaction hash with a given address.',
|
desc: 'The sign method calculates an Ethereum specific signature with: `sign(keccak256("\x19Ethereum Signed Message:\n" + len(message) + message)))`.',
|
||||||
params: [
|
params: [
|
||||||
{
|
{
|
||||||
type: Address,
|
type: Address,
|
||||||
desc: '20 Bytes - address.',
|
desc: '20 Bytes - address.',
|
||||||
format: 'inputAddressFormatter',
|
format: 'inputAddressFormatter',
|
||||||
example: '0xd1ade25ccd3d550a7eb532ac759cac7be09c2719'
|
example: '0xcd2a3d9f938e13cd947ec05abc7fe734df8dd826'
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
type: Data,
|
type: Data,
|
||||||
desc: 'Transaction hash to sign.',
|
desc: 'Data which hash to sign.',
|
||||||
example: withComment('0x5363686f6f6c627573', 'Schoolbus')
|
example: withComment('0x5363686f6f6c627573', 'Schoolbus')
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
returns: {
|
returns: {
|
||||||
type: Data,
|
type: Data,
|
||||||
desc: 'Signed data.',
|
desc: 'Signed data.',
|
||||||
example: '0x2ac19db245478a06032e69cdbd2b54e648b78431d0a47bd1fbab18f79f820ba407466e37adbe9e84541cab97ab7d290f4a64a5825c876d22109f3bf813254e8628'
|
example: '0xb1092cb5b23c2aa55e5b5787729c6be812509376de99a52bea2b41e5a5f8601c5641e74d01e4493c17bf1ef8b179c49362b2c721222128d58422a539310c6ecd1b'
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
@ -1068,7 +1068,7 @@ The following options are possible for the \`defaultBlock\` parameter:
|
|||||||
blockNumber: fromDecimal(5599),
|
blockNumber: fromDecimal(5599),
|
||||||
transactionIndex: fromDecimal(1),
|
transactionIndex: fromDecimal(1),
|
||||||
from: '0x407d73d8a49eeb85d32cf465507dd71d507100c1',
|
from: '0x407d73d8a49eeb85d32cf465507dd71d507100c1',
|
||||||
to: '0x85h43d8a49eeb85d32cf465507dd71d507100c1',
|
to: '0x853f43d8a49eeb85d32cf465507dd71d507100c1',
|
||||||
value: fromDecimal(520464),
|
value: fromDecimal(520464),
|
||||||
gas: fromDecimal(520464),
|
gas: fromDecimal(520464),
|
||||||
gasPrice: '0x09184e72a000',
|
gasPrice: '0x09184e72a000',
|
||||||
|
@ -258,7 +258,7 @@ impl FullDependencies {
|
|||||||
handler.extend_with(PersonalClient::new(&self.secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate());
|
handler.extend_with(PersonalClient::new(&self.secret_store, dispatcher.clone(), self.geth_compatibility).to_delegate());
|
||||||
},
|
},
|
||||||
Api::Signer => {
|
Api::Signer => {
|
||||||
handler.extend_with(SignerClient::new(&self.secret_store, dispatcher.clone(), &self.signer_service).to_delegate());
|
handler.extend_with(SignerClient::new(&self.secret_store, dispatcher.clone(), &self.signer_service, self.remote.clone()).to_delegate());
|
||||||
},
|
},
|
||||||
Api::Parity => {
|
Api::Parity => {
|
||||||
let signer = match self.signer_service.is_enabled() {
|
let signer = match self.signer_service.is_enabled() {
|
||||||
@ -352,6 +352,7 @@ pub struct LightDependencies {
|
|||||||
pub dapps_port: Option<u16>,
|
pub dapps_port: Option<u16>,
|
||||||
pub fetch: FetchClient,
|
pub fetch: FetchClient,
|
||||||
pub geth_compatibility: bool,
|
pub geth_compatibility: bool,
|
||||||
|
pub remote: parity_reactor::Remote,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Dependencies for LightDependencies {
|
impl Dependencies for LightDependencies {
|
||||||
@ -415,7 +416,7 @@ impl Dependencies for LightDependencies {
|
|||||||
},
|
},
|
||||||
Api::Signer => {
|
Api::Signer => {
|
||||||
let secret_store = Some(self.secret_store.clone());
|
let secret_store = Some(self.secret_store.clone());
|
||||||
handler.extend_with(SignerClient::new(&secret_store, dispatcher.clone(), &self.signer_service).to_delegate());
|
handler.extend_with(SignerClient::new(&secret_store, dispatcher.clone(), &self.signer_service, self.remote.clone()).to_delegate());
|
||||||
},
|
},
|
||||||
Api::Parity => {
|
Api::Parity => {
|
||||||
let signer = match self.signer_service.is_enabled() {
|
let signer = match self.signer_service.is_enabled() {
|
||||||
|
@ -292,6 +292,7 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
|
|||||||
},
|
},
|
||||||
fetch: fetch,
|
fetch: fetch,
|
||||||
geth_compatibility: cmd.geth_compatibility,
|
geth_compatibility: cmd.geth_compatibility,
|
||||||
|
remote: event_loop.remote(),
|
||||||
});
|
});
|
||||||
|
|
||||||
let dependencies = rpc::Dependencies {
|
let dependencies = rpc::Dependencies {
|
||||||
|
@ -33,6 +33,7 @@ mod poll_filter;
|
|||||||
mod requests;
|
mod requests;
|
||||||
mod signer;
|
mod signer;
|
||||||
mod signing_queue;
|
mod signing_queue;
|
||||||
|
mod subscribers;
|
||||||
mod subscription_manager;
|
mod subscription_manager;
|
||||||
|
|
||||||
pub use self::dispatch::{Dispatcher, FullDispatcher};
|
pub use self::dispatch::{Dispatcher, FullDispatcher};
|
||||||
@ -47,4 +48,5 @@ pub use self::signing_queue::{
|
|||||||
QUEUE_LIMIT as SIGNING_QUEUE_LIMIT,
|
QUEUE_LIMIT as SIGNING_QUEUE_LIMIT,
|
||||||
};
|
};
|
||||||
pub use self::signer::SignerService;
|
pub use self::signer::SignerService;
|
||||||
|
pub use self::subscribers::Subscribers;
|
||||||
pub use self::subscription_manager::GenericPollManager;
|
pub use self::subscription_manager::GenericPollManager;
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
|
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::sync::{mpsc, Arc};
|
use std::sync::Arc;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use jsonrpc_core;
|
use jsonrpc_core;
|
||||||
use util::{Mutex, RwLock, U256, Address};
|
use util::{Mutex, RwLock, U256, Address};
|
||||||
@ -27,7 +27,6 @@ use v1::types::{ConfirmationResponse, H160 as RpcH160, Origin, DappId as RpcDapp
|
|||||||
/// Result that can be returned from JSON RPC.
|
/// Result that can be returned from JSON RPC.
|
||||||
pub type RpcResult = Result<ConfirmationResponse, jsonrpc_core::Error>;
|
pub type RpcResult = Result<ConfirmationResponse, jsonrpc_core::Error>;
|
||||||
|
|
||||||
|
|
||||||
/// Type of default account
|
/// Type of default account
|
||||||
pub enum DefaultAccount {
|
pub enum DefaultAccount {
|
||||||
/// Default account is known
|
/// Default account is known
|
||||||
@ -49,7 +48,7 @@ impl From<RpcH160> for DefaultAccount {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Possible events happening in the queue that can be listened to.
|
/// Possible events happening in the queue that can be listened to.
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq, Clone)]
|
||||||
pub enum QueueEvent {
|
pub enum QueueEvent {
|
||||||
/// Receiver should stop work upon receiving `Finish` message.
|
/// Receiver should stop work upon receiving `Finish` message.
|
||||||
Finish,
|
Finish,
|
||||||
@ -61,15 +60,6 @@ pub enum QueueEvent {
|
|||||||
RequestConfirmed(U256),
|
RequestConfirmed(U256),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Defines possible errors returned from queue receiving method.
|
|
||||||
#[derive(Debug, PartialEq)]
|
|
||||||
pub enum QueueError {
|
|
||||||
/// Returned when method has been already used (no receiver available).
|
|
||||||
AlreadyUsed,
|
|
||||||
/// Returned when receiver encounters an error.
|
|
||||||
ReceiverError(mpsc::RecvError),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Defines possible errors when inserting to queue
|
/// Defines possible errors when inserting to queue
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, PartialEq)]
|
||||||
pub enum QueueAddError {
|
pub enum QueueAddError {
|
||||||
@ -184,59 +174,31 @@ impl ConfirmationPromise {
|
|||||||
|
|
||||||
|
|
||||||
/// Queue for all unconfirmed requests.
|
/// Queue for all unconfirmed requests.
|
||||||
|
#[derive(Default)]
|
||||||
pub struct ConfirmationsQueue {
|
pub struct ConfirmationsQueue {
|
||||||
id: Mutex<U256>,
|
id: Mutex<U256>,
|
||||||
queue: RwLock<BTreeMap<U256, ConfirmationToken>>,
|
queue: RwLock<BTreeMap<U256, ConfirmationToken>>,
|
||||||
sender: Mutex<mpsc::Sender<QueueEvent>>,
|
on_event: RwLock<Vec<Box<Fn(QueueEvent) -> () + Send + Sync>>>,
|
||||||
receiver: Mutex<Option<mpsc::Receiver<QueueEvent>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for ConfirmationsQueue {
|
|
||||||
fn default() -> Self {
|
|
||||||
let (send, recv) = mpsc::channel();
|
|
||||||
|
|
||||||
ConfirmationsQueue {
|
|
||||||
id: Mutex::new(U256::from(0)),
|
|
||||||
queue: RwLock::new(BTreeMap::new()),
|
|
||||||
sender: Mutex::new(send),
|
|
||||||
receiver: Mutex::new(Some(recv)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConfirmationsQueue {
|
impl ConfirmationsQueue {
|
||||||
|
/// Adds a queue listener. For each event, `listener` callback will be invoked.
|
||||||
/// Blocks the thread and starts listening for notifications regarding all actions in the queue.
|
pub fn on_event<F: Fn(QueueEvent) -> () + Send + Sync + 'static>(&self, listener: F) {
|
||||||
/// For each event, `listener` callback will be invoked.
|
self.on_event.write().push(Box::new(listener));
|
||||||
/// This method can be used only once (only single consumer of events can exist).
|
|
||||||
pub fn start_listening<F>(&self, listener: F) -> Result<(), QueueError>
|
|
||||||
where F: Fn(QueueEvent) -> () {
|
|
||||||
let recv = self.receiver.lock().take();
|
|
||||||
if let None = recv {
|
|
||||||
return Err(QueueError::AlreadyUsed);
|
|
||||||
}
|
|
||||||
let recv = recv.expect("Check for none is done earlier.");
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let message = recv.recv().map_err(|e| QueueError::ReceiverError(e))?;
|
|
||||||
if let QueueEvent::Finish = message {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
listener(message);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Notifies consumer that the communcation is over.
|
/// Notifies consumer that the communcation is over.
|
||||||
/// No more events will be sent after this function is invoked.
|
/// No more events will be sent after this function is invoked.
|
||||||
pub fn finish(&self) {
|
pub fn finish(&self) {
|
||||||
self.notify(QueueEvent::Finish);
|
self.notify(QueueEvent::Finish);
|
||||||
|
self.on_event.write().clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Notifies receiver about the event happening in this queue.
|
/// Notifies receiver about the event happening in this queue.
|
||||||
fn notify(&self, message: QueueEvent) {
|
fn notify(&self, message: QueueEvent) {
|
||||||
// We don't really care about the result
|
for listener in &*self.on_event.read() {
|
||||||
let _ = self.sender.lock().send(message);
|
listener(message.clone())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes requests from this queue and notifies `ConfirmationPromise` holders about the result.
|
/// Removes requests from this queue and notifies `ConfirmationPromise` holders about the result.
|
||||||
@ -384,26 +346,23 @@ mod test {
|
|||||||
#[test]
|
#[test]
|
||||||
fn should_receive_notification() {
|
fn should_receive_notification() {
|
||||||
// given
|
// given
|
||||||
let received = Arc::new(Mutex::new(None));
|
let received = Arc::new(Mutex::new(vec![]));
|
||||||
let queue = Arc::new(ConfirmationsQueue::default());
|
let queue = Arc::new(ConfirmationsQueue::default());
|
||||||
let request = request();
|
let request = request();
|
||||||
|
|
||||||
// when
|
// when
|
||||||
let q = queue.clone();
|
|
||||||
let r = received.clone();
|
let r = received.clone();
|
||||||
let handle = thread::spawn(move || {
|
queue.on_event(move |notification| {
|
||||||
q.start_listening(move |notification| {
|
r.lock().push(notification);
|
||||||
let mut v = r.lock();
|
|
||||||
*v = Some(notification);
|
|
||||||
}).expect("Should be closed nicely.")
|
|
||||||
});
|
});
|
||||||
queue.add_request(request, Default::default()).unwrap();
|
queue.add_request(request, Default::default()).unwrap();
|
||||||
queue.finish();
|
queue.finish();
|
||||||
|
|
||||||
// then
|
// then
|
||||||
handle.join().expect("Thread should finish nicely");
|
let r = received.lock();
|
||||||
let r = received.lock().take();
|
assert_eq!(r[0], QueueEvent::NewRequest(U256::from(1)));
|
||||||
assert_eq!(r, Some(QueueEvent::NewRequest(U256::from(1))));
|
assert_eq!(r[1], QueueEvent::Finish);
|
||||||
|
assert_eq!(r.len(), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
86
rpc/src/v1/helpers/subscribers.rs
Normal file
86
rpc/src/v1/helpers/subscribers.rs
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
// Copyright 2015-2017 Parity Technologies (UK) Ltd.
|
||||||
|
// This file is part of Parity.
|
||||||
|
|
||||||
|
// Parity is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// Parity is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU General Public License
|
||||||
|
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
//! A map of subscribers.
|
||||||
|
|
||||||
|
use std::ops;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use jsonrpc_macros::pubsub::{Subscriber, Sink, SubscriptionId};
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Subscribers<T> {
|
||||||
|
next_id: u64,
|
||||||
|
subscriptions: HashMap<u64, T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Default for Subscribers<T> {
|
||||||
|
fn default() -> Self {
|
||||||
|
Subscribers {
|
||||||
|
next_id: 0,
|
||||||
|
subscriptions: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Subscribers<T> {
|
||||||
|
fn next_id(&mut self) -> u64 {
|
||||||
|
self.next_id += 1;
|
||||||
|
self.next_id
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert new subscription and return assigned id.
|
||||||
|
pub fn insert(&mut self, val: T) -> SubscriptionId {
|
||||||
|
let id = self.next_id();
|
||||||
|
debug!(target: "pubsub", "Adding subscription id={}", id);
|
||||||
|
self.subscriptions.insert(id, val);
|
||||||
|
SubscriptionId::Number(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes subscription with given id and returns it (if any).
|
||||||
|
pub fn remove(&mut self, id: &SubscriptionId) -> Option<T> {
|
||||||
|
trace!(target: "pubsub", "Removing subscription id={:?}", id);
|
||||||
|
match *id {
|
||||||
|
SubscriptionId::Number(id) => {
|
||||||
|
self.subscriptions.remove(&id)
|
||||||
|
},
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <T> Subscribers<Sink<T>> {
|
||||||
|
/// Assigns id and adds a subscriber to the list.
|
||||||
|
pub fn push(&mut self, sub: Subscriber<T>) {
|
||||||
|
let id = self.next_id();
|
||||||
|
match sub.assign_id(SubscriptionId::Number(id)) {
|
||||||
|
Ok(sink) => {
|
||||||
|
debug!(target: "pubsub", "Adding subscription id={:?}", id);
|
||||||
|
self.subscriptions.insert(id, sink);
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
self.next_id -= 1;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> ops::Deref for Subscribers<T> {
|
||||||
|
type Target = HashMap<u64, T>;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.subscriptions
|
||||||
|
}
|
||||||
|
}
|
@ -17,14 +17,15 @@
|
|||||||
//! Generic poll manager for Pub-Sub.
|
//! Generic poll manager for Pub-Sub.
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::collections::HashMap;
|
|
||||||
use util::Mutex;
|
use util::Mutex;
|
||||||
|
|
||||||
use jsonrpc_core::futures::future::{self, Either};
|
use jsonrpc_core::futures::future::{self, Either};
|
||||||
use jsonrpc_core::futures::sync::mpsc;
|
use jsonrpc_core::futures::sync::mpsc;
|
||||||
use jsonrpc_core::futures::{Sink, Future, BoxFuture};
|
use jsonrpc_core::futures::{Sink, Future, BoxFuture};
|
||||||
use jsonrpc_core::{self as core, MetaIoHandler};
|
use jsonrpc_core::{self as core, MetaIoHandler};
|
||||||
|
use jsonrpc_pubsub::SubscriptionId;
|
||||||
|
|
||||||
|
use v1::helpers::Subscribers;
|
||||||
use v1::metadata::Metadata;
|
use v1::metadata::Metadata;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -40,8 +41,7 @@ struct Subscription {
|
|||||||
/// TODO [ToDr] Depending on the method decide on poll interval.
|
/// TODO [ToDr] Depending on the method decide on poll interval.
|
||||||
/// For most of the methods it will be enough to poll on new block instead of time-interval.
|
/// For most of the methods it will be enough to poll on new block instead of time-interval.
|
||||||
pub struct GenericPollManager<S: core::Middleware<Metadata>> {
|
pub struct GenericPollManager<S: core::Middleware<Metadata>> {
|
||||||
next_id: usize,
|
subscribers: Subscribers<Subscription>,
|
||||||
poll_subscriptions: HashMap<usize, Subscription>,
|
|
||||||
rpc: MetaIoHandler<Metadata, S>,
|
rpc: MetaIoHandler<Metadata, S>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,21 +49,16 @@ impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
|
|||||||
/// Creates new poll manager
|
/// Creates new poll manager
|
||||||
pub fn new(rpc: MetaIoHandler<Metadata, S>) -> Self {
|
pub fn new(rpc: MetaIoHandler<Metadata, S>) -> Self {
|
||||||
GenericPollManager {
|
GenericPollManager {
|
||||||
next_id: 1,
|
subscribers: Default::default(),
|
||||||
poll_subscriptions: Default::default(),
|
|
||||||
rpc: rpc,
|
rpc: rpc,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Subscribes to update from polling given method.
|
/// Subscribes to update from polling given method.
|
||||||
pub fn subscribe(&mut self, metadata: Metadata, method: String, params: core::Params)
|
pub fn subscribe(&mut self, metadata: Metadata, method: String, params: core::Params)
|
||||||
-> (usize, mpsc::Receiver<Result<core::Value, core::Error>>)
|
-> (SubscriptionId, mpsc::Receiver<Result<core::Value, core::Error>>)
|
||||||
{
|
{
|
||||||
let id = self.next_id;
|
|
||||||
self.next_id += 1;
|
|
||||||
|
|
||||||
let (sink, stream) = mpsc::channel(1);
|
let (sink, stream) = mpsc::channel(1);
|
||||||
|
|
||||||
let subscription = Subscription {
|
let subscription = Subscription {
|
||||||
metadata: metadata,
|
metadata: metadata,
|
||||||
method: method,
|
method: method,
|
||||||
@ -71,21 +66,19 @@ impl<S: core::Middleware<Metadata>> GenericPollManager<S> {
|
|||||||
sink: sink,
|
sink: sink,
|
||||||
last_result: Default::default(),
|
last_result: Default::default(),
|
||||||
};
|
};
|
||||||
|
let id = self.subscribers.insert(subscription);
|
||||||
debug!(target: "pubsub", "Adding subscription id={:?}, {:?}", id, subscription);
|
|
||||||
self.poll_subscriptions.insert(id, subscription);
|
|
||||||
(id, stream)
|
(id, stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn unsubscribe(&mut self, id: usize) -> bool {
|
pub fn unsubscribe(&mut self, id: &SubscriptionId) -> bool {
|
||||||
debug!(target: "pubsub", "Removing subscription: {:?}", id);
|
debug!(target: "pubsub", "Removing subscription: {:?}", id);
|
||||||
self.poll_subscriptions.remove(&id).is_some()
|
self.subscribers.remove(id).is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn tick(&self) -> BoxFuture<(), ()> {
|
pub fn tick(&self) -> BoxFuture<(), ()> {
|
||||||
let mut futures = Vec::new();
|
let mut futures = Vec::new();
|
||||||
// poll all subscriptions
|
// poll all subscriptions
|
||||||
for (id, subscription) in self.poll_subscriptions.iter() {
|
for (id, subscription) in self.subscribers.iter() {
|
||||||
let call = core::MethodCall {
|
let call = core::MethodCall {
|
||||||
jsonrpc: Some(core::Version::V2),
|
jsonrpc: Some(core::Version::V2),
|
||||||
id: core::Id::Num(*id as u64),
|
id: core::Id::Num(*id as u64),
|
||||||
@ -130,6 +123,7 @@ mod tests {
|
|||||||
|
|
||||||
use jsonrpc_core::{MetaIoHandler, NoopMiddleware, Value, Params};
|
use jsonrpc_core::{MetaIoHandler, NoopMiddleware, Value, Params};
|
||||||
use jsonrpc_core::futures::{Future, Stream};
|
use jsonrpc_core::futures::{Future, Stream};
|
||||||
|
use jsonrpc_pubsub::SubscriptionId;
|
||||||
use http::tokio_core::reactor;
|
use http::tokio_core::reactor;
|
||||||
|
|
||||||
use super::GenericPollManager;
|
use super::GenericPollManager;
|
||||||
@ -154,7 +148,7 @@ mod tests {
|
|||||||
let mut el = reactor::Core::new().unwrap();
|
let mut el = reactor::Core::new().unwrap();
|
||||||
let mut poll_manager = poll_manager();
|
let mut poll_manager = poll_manager();
|
||||||
let (id, rx) = poll_manager.subscribe(Default::default(), "hello".into(), Params::None);
|
let (id, rx) = poll_manager.subscribe(Default::default(), "hello".into(), Params::None);
|
||||||
assert_eq!(id, 1);
|
assert_eq!(id, SubscriptionId::Number(1));
|
||||||
|
|
||||||
// then
|
// then
|
||||||
poll_manager.tick().wait().unwrap();
|
poll_manager.tick().wait().unwrap();
|
||||||
@ -169,7 +163,7 @@ mod tests {
|
|||||||
// and no more notifications
|
// and no more notifications
|
||||||
poll_manager.tick().wait().unwrap();
|
poll_manager.tick().wait().unwrap();
|
||||||
// we need to unsubscribe otherwise the future will never finish.
|
// we need to unsubscribe otherwise the future will never finish.
|
||||||
poll_manager.unsubscribe(1);
|
poll_manager.unsubscribe(&id);
|
||||||
assert_eq!(el.run(rx.into_future()).unwrap().0, None);
|
assert_eq!(el.run(rx.into_future()).unwrap().0, None);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -70,7 +70,7 @@ impl<S: core::Middleware<Metadata>> PubSub for PubSubClient<S> {
|
|||||||
|
|
||||||
let mut poll_manager = self.poll_manager.write();
|
let mut poll_manager = self.poll_manager.write();
|
||||||
let (id, receiver) = poll_manager.subscribe(meta, method, params);
|
let (id, receiver) = poll_manager.subscribe(meta, method, params);
|
||||||
match subscriber.assign_id(SubscriptionId::Number(id as u64)) {
|
match subscriber.assign_id(id.clone()) {
|
||||||
Ok(sink) => {
|
Ok(sink) => {
|
||||||
self.remote.spawn(receiver.map(|res| match res {
|
self.remote.spawn(receiver.map(|res| match res {
|
||||||
Ok(val) => val,
|
Ok(val) => val,
|
||||||
@ -83,18 +83,13 @@ impl<S: core::Middleware<Metadata>> PubSub for PubSubClient<S> {
|
|||||||
})).map(|_| ()));
|
})).map(|_| ()));
|
||||||
},
|
},
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
poll_manager.unsubscribe(id);
|
poll_manager.unsubscribe(&id);
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parity_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<bool, Error> {
|
fn parity_unsubscribe(&self, id: SubscriptionId) -> BoxFuture<bool, Error> {
|
||||||
let res = if let SubscriptionId::Number(id) = id {
|
let res = self.poll_manager.write().unsubscribe(&id);
|
||||||
self.poll_manager.write().unsubscribe(id as usize)
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
};
|
|
||||||
|
|
||||||
futures::future::ok(res).boxed()
|
futures::future::ok(res).boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,16 +18,21 @@
|
|||||||
|
|
||||||
use std::sync::{Arc, Weak};
|
use std::sync::{Arc, Weak};
|
||||||
|
|
||||||
use rlp::UntrustedRlp;
|
|
||||||
use ethcore::account_provider::AccountProvider;
|
use ethcore::account_provider::AccountProvider;
|
||||||
use ethcore::transaction::{SignedTransaction, PendingTransaction};
|
use ethcore::transaction::{SignedTransaction, PendingTransaction};
|
||||||
use ethkey;
|
use ethkey;
|
||||||
use futures::{future, BoxFuture, Future, IntoFuture};
|
use futures::{future, BoxFuture, Future, IntoFuture};
|
||||||
|
use parity_reactor::Remote;
|
||||||
|
use rlp::UntrustedRlp;
|
||||||
|
use util::Mutex;
|
||||||
|
|
||||||
use jsonrpc_core::Error;
|
use jsonrpc_core::{futures, Error};
|
||||||
|
use jsonrpc_pubsub::SubscriptionId;
|
||||||
|
use jsonrpc_macros::pubsub::{Sink, Subscriber};
|
||||||
use v1::helpers::accounts::unwrap_provider;
|
use v1::helpers::accounts::unwrap_provider;
|
||||||
use v1::helpers::dispatch::{self, Dispatcher, WithToken, eth_data_hash};
|
use v1::helpers::dispatch::{self, Dispatcher, WithToken, eth_data_hash};
|
||||||
use v1::helpers::{errors, SignerService, SigningQueue, ConfirmationPayload, FilledTransactionRequest};
|
use v1::helpers::{errors, SignerService, SigningQueue, ConfirmationPayload, FilledTransactionRequest, Subscribers};
|
||||||
|
use v1::metadata::Metadata;
|
||||||
use v1::traits::Signer;
|
use v1::traits::Signer;
|
||||||
use v1::types::{TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken, U256, Bytes};
|
use v1::types::{TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken, U256, Bytes};
|
||||||
|
|
||||||
@ -35,21 +40,40 @@ use v1::types::{TransactionModification, ConfirmationRequest, ConfirmationRespon
|
|||||||
pub struct SignerClient<D: Dispatcher> {
|
pub struct SignerClient<D: Dispatcher> {
|
||||||
signer: Weak<SignerService>,
|
signer: Weak<SignerService>,
|
||||||
accounts: Option<Weak<AccountProvider>>,
|
accounts: Option<Weak<AccountProvider>>,
|
||||||
dispatcher: D
|
dispatcher: D,
|
||||||
|
subscribers: Arc<Mutex<Subscribers<Sink<Vec<ConfirmationRequest>>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Dispatcher + 'static> SignerClient<D> {
|
impl<D: Dispatcher + 'static> SignerClient<D> {
|
||||||
|
|
||||||
/// Create new instance of signer client.
|
/// Create new instance of signer client.
|
||||||
pub fn new(
|
pub fn new(
|
||||||
store: &Option<Arc<AccountProvider>>,
|
store: &Option<Arc<AccountProvider>>,
|
||||||
dispatcher: D,
|
dispatcher: D,
|
||||||
signer: &Arc<SignerService>,
|
signer: &Arc<SignerService>,
|
||||||
|
remote: Remote,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
let subscribers = Arc::new(Mutex::new(Subscribers::default()));
|
||||||
|
let subs = Arc::downgrade(&subscribers);
|
||||||
|
let s = Arc::downgrade(signer);
|
||||||
|
signer.queue().on_event(move |_event| {
|
||||||
|
if let (Some(s), Some(subs)) = (s.upgrade(), subs.upgrade()) {
|
||||||
|
let requests = s.requests().into_iter().map(Into::into).collect::<Vec<ConfirmationRequest>>();
|
||||||
|
for subscription in subs.lock().values() {
|
||||||
|
let subscription: &Sink<_> = subscription;
|
||||||
|
remote.spawn(subscription
|
||||||
|
.notify(requests.clone())
|
||||||
|
.map(|_| ())
|
||||||
|
.map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
SignerClient {
|
SignerClient {
|
||||||
signer: Arc::downgrade(signer),
|
signer: Arc::downgrade(signer),
|
||||||
accounts: store.as_ref().map(Arc::downgrade),
|
accounts: store.as_ref().map(Arc::downgrade),
|
||||||
dispatcher: dispatcher,
|
dispatcher: dispatcher,
|
||||||
|
subscribers: subscribers,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,10 +163,10 @@ impl<D: Dispatcher + 'static> SignerClient<D> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<D: Dispatcher + 'static> Signer for SignerClient<D> {
|
impl<D: Dispatcher + 'static> Signer for SignerClient<D> {
|
||||||
|
type Metadata = Metadata;
|
||||||
|
|
||||||
fn requests_to_confirm(&self) -> Result<Vec<ConfirmationRequest>, Error> {
|
fn requests_to_confirm(&self) -> Result<Vec<ConfirmationRequest>, Error> {
|
||||||
let signer = take_weak!(self.signer);
|
let signer = take_weak!(self.signer);
|
||||||
|
|
||||||
Ok(signer.requests()
|
Ok(signer.requests()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(Into::into)
|
.map(Into::into)
|
||||||
@ -214,23 +238,26 @@ impl<D: Dispatcher + 'static> Signer for SignerClient<D> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn reject_request(&self, id: U256) -> Result<bool, Error> {
|
fn reject_request(&self, id: U256) -> Result<bool, Error> {
|
||||||
let signer = take_weak!(self.signer);
|
let res = take_weak!(self.signer).request_rejected(id.into());
|
||||||
|
|
||||||
let res = signer.request_rejected(id.into());
|
|
||||||
Ok(res.is_some())
|
Ok(res.is_some())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn generate_token(&self) -> Result<String, Error> {
|
fn generate_token(&self) -> Result<String, Error> {
|
||||||
let signer = take_weak!(self.signer);
|
take_weak!(self.signer).generate_token()
|
||||||
|
|
||||||
signer.generate_token()
|
|
||||||
.map_err(|e| errors::token(e))
|
.map_err(|e| errors::token(e))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn generate_web_proxy_token(&self) -> Result<String, Error> {
|
fn generate_web_proxy_token(&self) -> Result<String, Error> {
|
||||||
let signer = take_weak!(self.signer);
|
Ok(take_weak!(self.signer).generate_web_proxy_access_token())
|
||||||
|
}
|
||||||
|
|
||||||
Ok(signer.generate_web_proxy_access_token())
|
fn subscribe_pending(&self, _meta: Self::Metadata, sub: Subscriber<Vec<ConfirmationRequest>>) {
|
||||||
|
self.subscribers.lock().push(sub)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unsubscribe_pending(&self, id: SubscriptionId) -> BoxFuture<bool, Error> {
|
||||||
|
let res = self.subscribers.lock().remove(&id).is_some();
|
||||||
|
futures::future::ok(res).boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,6 +21,7 @@ use util::{U256, Uint, Address, ToPretty};
|
|||||||
use ethcore::account_provider::AccountProvider;
|
use ethcore::account_provider::AccountProvider;
|
||||||
use ethcore::client::TestBlockChainClient;
|
use ethcore::client::TestBlockChainClient;
|
||||||
use ethcore::transaction::{Transaction, Action, SignedTransaction};
|
use ethcore::transaction::{Transaction, Action, SignedTransaction};
|
||||||
|
use parity_reactor::EventLoop;
|
||||||
use rlp::encode;
|
use rlp::encode;
|
||||||
|
|
||||||
use serde_json;
|
use serde_json;
|
||||||
@ -40,6 +41,7 @@ struct SignerTester {
|
|||||||
// these unused fields are necessary to keep the data alive
|
// these unused fields are necessary to keep the data alive
|
||||||
// as the handler has only weak pointers.
|
// as the handler has only weak pointers.
|
||||||
_client: Arc<TestBlockChainClient>,
|
_client: Arc<TestBlockChainClient>,
|
||||||
|
_event_loop: EventLoop,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn blockchain_client() -> Arc<TestBlockChainClient> {
|
fn blockchain_client() -> Arc<TestBlockChainClient> {
|
||||||
@ -61,10 +63,11 @@ fn signer_tester() -> SignerTester {
|
|||||||
let opt_accounts = Some(accounts.clone());
|
let opt_accounts = Some(accounts.clone());
|
||||||
let client = blockchain_client();
|
let client = blockchain_client();
|
||||||
let miner = miner_service();
|
let miner = miner_service();
|
||||||
|
let event_loop = EventLoop::spawn();
|
||||||
|
|
||||||
let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner));
|
let dispatcher = FullDispatcher::new(Arc::downgrade(&client), Arc::downgrade(&miner));
|
||||||
let mut io = IoHandler::default();
|
let mut io = IoHandler::default();
|
||||||
io.extend_with(SignerClient::new(&opt_accounts, dispatcher, &signer).to_delegate());
|
io.extend_with(SignerClient::new(&opt_accounts, dispatcher, &signer, event_loop.remote()).to_delegate());
|
||||||
|
|
||||||
SignerTester {
|
SignerTester {
|
||||||
signer: signer,
|
signer: signer,
|
||||||
@ -72,6 +75,7 @@ fn signer_tester() -> SignerTester {
|
|||||||
io: io,
|
io: io,
|
||||||
miner: miner,
|
miner: miner,
|
||||||
_client: client,
|
_client: client,
|
||||||
|
_event_loop: event_loop,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,6 +16,8 @@
|
|||||||
|
|
||||||
//! Parity Signer-related rpc interface.
|
//! Parity Signer-related rpc interface.
|
||||||
use jsonrpc_core::Error;
|
use jsonrpc_core::Error;
|
||||||
|
use jsonrpc_pubsub::SubscriptionId;
|
||||||
|
use jsonrpc_macros::pubsub::Subscriber;
|
||||||
use futures::BoxFuture;
|
use futures::BoxFuture;
|
||||||
|
|
||||||
use v1::types::{U256, Bytes, TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken};
|
use v1::types::{U256, Bytes, TransactionModification, ConfirmationRequest, ConfirmationResponse, ConfirmationResponseWithToken};
|
||||||
@ -23,6 +25,7 @@ use v1::types::{U256, Bytes, TransactionModification, ConfirmationRequest, Confi
|
|||||||
build_rpc_trait! {
|
build_rpc_trait! {
|
||||||
/// Signer extension for confirmations rpc interface.
|
/// Signer extension for confirmations rpc interface.
|
||||||
pub trait Signer {
|
pub trait Signer {
|
||||||
|
type Metadata;
|
||||||
|
|
||||||
/// Returns a list of items to confirm.
|
/// Returns a list of items to confirm.
|
||||||
#[rpc(name = "signer_requestsToConfirm")]
|
#[rpc(name = "signer_requestsToConfirm")]
|
||||||
@ -51,5 +54,15 @@ build_rpc_trait! {
|
|||||||
/// Generates new web proxy access token.
|
/// Generates new web proxy access token.
|
||||||
#[rpc(name = "signer_generateWebProxyAccessToken")]
|
#[rpc(name = "signer_generateWebProxyAccessToken")]
|
||||||
fn generate_web_proxy_token(&self) -> Result<String, Error>;
|
fn generate_web_proxy_token(&self) -> Result<String, Error>;
|
||||||
|
|
||||||
|
#[pubsub(name = "signer_pending")] {
|
||||||
|
/// Subscribe to new pending requests on signer interface.
|
||||||
|
#[rpc(name = "signer_subscribePending")]
|
||||||
|
fn subscribe_pending(&self, Self::Metadata, Subscriber<Vec<ConfirmationRequest>>);
|
||||||
|
|
||||||
|
/// Unsubscribe from pending requests subscription.
|
||||||
|
#[rpc(name = "signer_unsubscribePending")]
|
||||||
|
fn unsubscribe_pending(&self, SubscriptionId) -> BoxFuture<bool, Error>;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -127,7 +127,7 @@ impl ServerBuilder {
|
|||||||
/// `WebSockets` server implementation.
|
/// `WebSockets` server implementation.
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
handle: Option<thread::JoinHandle<()>>,
|
handle: Option<thread::JoinHandle<()>>,
|
||||||
broadcaster_handle: Option<thread::JoinHandle<()>>,
|
broadcaster: ws::Sender,
|
||||||
queue: Arc<ConfirmationsQueue>,
|
queue: Arc<ConfirmationsQueue>,
|
||||||
panic_handler: Arc<PanicHandler>,
|
panic_handler: Arc<PanicHandler>,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
@ -188,27 +188,10 @@ impl Server {
|
|||||||
}).unwrap();
|
}).unwrap();
|
||||||
});
|
});
|
||||||
|
|
||||||
// Spawn a thread for broadcasting
|
|
||||||
let ph = panic_handler.clone();
|
|
||||||
let q = queue.clone();
|
|
||||||
let broadcaster_handle = thread::spawn(move || {
|
|
||||||
ph.catch_panic(move || {
|
|
||||||
q.start_listening(|_message| {
|
|
||||||
// TODO [ToDr] Some better structure here for messages.
|
|
||||||
broadcaster.send("new_message").unwrap();
|
|
||||||
}).expect("It's the only place we are running start_listening. It shouldn't fail.");
|
|
||||||
let res = broadcaster.shutdown();
|
|
||||||
|
|
||||||
if let Err(e) = res {
|
|
||||||
warn!("Signer: Broadcaster was not closed cleanly. Details: {:?}", e);
|
|
||||||
}
|
|
||||||
}).unwrap()
|
|
||||||
});
|
|
||||||
|
|
||||||
// Return a handle
|
// Return a handle
|
||||||
Ok(Server {
|
Ok(Server {
|
||||||
handle: Some(handle),
|
handle: Some(handle),
|
||||||
broadcaster_handle: Some(broadcaster_handle),
|
broadcaster: broadcaster,
|
||||||
queue: queue,
|
queue: queue,
|
||||||
panic_handler: panic_handler,
|
panic_handler: panic_handler,
|
||||||
addr: addr,
|
addr: addr,
|
||||||
@ -225,7 +208,7 @@ impl MayPanic for Server {
|
|||||||
impl Drop for Server {
|
impl Drop for Server {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.queue.finish();
|
self.queue.finish();
|
||||||
self.broadcaster_handle.take().unwrap().join().unwrap();
|
self.broadcaster.shutdown().unwrap();
|
||||||
self.handle.take().unwrap().join().unwrap();
|
self.handle.take().unwrap().join().unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user