Logs Pub-Sub (#5705)

* Logs subscription.

* Add test.

* lock -> write
This commit is contained in:
Tomasz Drwięga 2017-06-28 12:21:13 +02:00 committed by Arkadiy Paronyan
parent bcc84a31b7
commit 71c68cc000
8 changed files with 356 additions and 103 deletions

View File

@ -443,7 +443,13 @@ impl LightDependencies {
} }
}, },
Api::EthPubSub => { Api::EthPubSub => {
let client = EthPubSubClient::new(self.client.clone(), self.remote.clone()); let client = EthPubSubClient::light(
self.client.clone(),
self.on_demand.clone(),
self.sync.clone(),
self.cache.clone(),
self.remote.clone(),
);
self.client.add_listener( self.client.add_listener(
Arc::downgrade(&client.handler()) as Weak<::light::client::LightChainNotify> Arc::downgrade(&client.handler()) as Weak<::light::client::LightChainNotify>
); );

View File

@ -22,6 +22,7 @@ use ethcore::basic_account::BasicAccount;
use ethcore::encoded; use ethcore::encoded;
use ethcore::executed::{Executed, ExecutionError}; use ethcore::executed::{Executed, ExecutionError};
use ethcore::ids::BlockId; use ethcore::ids::BlockId;
use ethcore::filter::Filter as EthcoreFilter;
use ethcore::transaction::{Action, Transaction as EthTransaction}; use ethcore::transaction::{Action, Transaction as EthTransaction};
use futures::{future, Future, BoxFuture}; use futures::{future, Future, BoxFuture};
@ -38,7 +39,7 @@ use ethsync::LightSync;
use util::{Address, Mutex, U256}; use util::{Address, Mutex, U256};
use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch}; use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch};
use v1::types::{BlockNumber, CallRequest}; use v1::types::{BlockNumber, CallRequest, Log};
/// Helper for fetching blockchain data either from the light client or the network /// Helper for fetching blockchain data either from the light client or the network
/// as necessary. /// as necessary.
@ -259,4 +260,63 @@ impl LightFetch {
None => future::err(errors::network_disabled()).boxed() None => future::err(errors::network_disabled()).boxed()
} }
} }
/// get transaction logs
pub fn logs(&self, filter: EthcoreFilter) -> BoxFuture<Vec<Log>, Error> {
use std::collections::BTreeMap;
use futures::stream::{self, Stream};
const NO_INVALID_BACK_REFS: &'static str = "Fails only on invalid back-references; back-references here known to be valid; qed";
// early exit for "to" block before "from" block.
let best_number = self.client.chain_info().best_block_number;
let block_number = |id| match id {
BlockId::Earliest => Some(0),
BlockId::Latest | BlockId::Pending => Some(best_number),
BlockId::Hash(h) => self.client.block_header(BlockId::Hash(h)).map(|hdr| hdr.number()),
BlockId::Number(x) => Some(x),
};
match (block_number(filter.to_block), block_number(filter.from_block)) {
(Some(to), Some(from)) if to < from => return future::ok(Vec::new()).boxed(),
(Some(_), Some(_)) => {},
_ => return future::err(errors::unknown_block()).boxed(),
}
let maybe_future = self.sync.with_context(move |ctx| {
// find all headers which match the filter, and fetch the receipts for each one.
// match them with their numbers for easy sorting later.
let bit_combos = filter.bloom_possibilities();
let receipts_futures: Vec<_> = self.client.ancestry_iter(filter.to_block)
.take_while(|ref hdr| BlockId::Number(hdr.number()) != filter.from_block)
.take_while(|ref hdr| BlockId::Hash(hdr.hash()) != filter.from_block)
.filter(|ref hdr| {
let hdr_bloom = hdr.log_bloom();
bit_combos.iter().find(|&bloom| hdr_bloom & *bloom == *bloom).is_some()
})
.map(|hdr| (hdr.number(), request::BlockReceipts(hdr.into())))
.map(|(num, req)| self.on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS).map(move |x| (num, x)))
.collect();
// as the receipts come in, find logs within them which match the filter.
// insert them into a BTreeMap to maintain order by number and block index.
stream::futures_unordered(receipts_futures)
.fold(BTreeMap::new(), move |mut matches, (num, receipts)| {
for (block_index, log) in receipts.into_iter().flat_map(|r| r.logs).enumerate() {
if filter.matches(&log) {
matches.insert((num, block_index), log.into());
}
}
future::ok(matches)
}) // and then collect them into a vector.
.map(|matches| matches.into_iter().map(|(_, v)| v).collect())
.map_err(errors::on_demand_cancel)
});
match maybe_future {
Some(fut) => fut.boxed(),
None => future::err(errors::network_disabled()).boxed(),
}
}
} }

View File

@ -95,7 +95,7 @@ impl<T> Subscribers<T> {
} }
} }
impl <T> Subscribers<Sink<T>> { impl<T> Subscribers<Sink<T>> {
/// Assigns id and adds a subscriber to the list. /// Assigns id and adds a subscriber to the list.
pub fn push(&mut self, sub: Subscriber<T>) { pub fn push(&mut self, sub: Subscriber<T>) {
let id = self.next_id(); let id = self.next_id();
@ -106,6 +106,17 @@ impl <T> Subscribers<Sink<T>> {
} }
} }
impl<T, V> Subscribers<(Sink<T>, V)> {
/// Assigns id and adds a subscriber to the list.
pub fn push(&mut self, sub: Subscriber<T>, val: V) {
let id = self.next_id();
if let Ok(sink) = sub.assign_id(SubscriptionId::String(id.as_string())) {
debug!(target: "pubsub", "Adding subscription id={:?}", id);
self.subscriptions.insert(id, (sink, val));
}
}
}
impl<T> ops::Deref for Subscribers<T> { impl<T> ops::Deref for Subscribers<T> {
type Target = HashMap<Id, T>; type Target = HashMap<Id, T>;

View File

@ -19,40 +19,51 @@
use std::sync::Arc; use std::sync::Arc;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use futures::{self, BoxFuture, Future}; use futures::{self, future, BoxFuture, Future};
use jsonrpc_core::Error; use jsonrpc_core::Error;
use jsonrpc_macros::Trailing; use jsonrpc_macros::Trailing;
use jsonrpc_macros::pubsub::{Sink, Subscriber}; use jsonrpc_macros::pubsub::{Sink, Subscriber};
use jsonrpc_pubsub::SubscriptionId; use jsonrpc_pubsub::SubscriptionId;
use v1::helpers::{errors, Subscribers}; use v1::helpers::{errors, limit_logs, Subscribers};
use v1::helpers::light_fetch::LightFetch;
use v1::metadata::Metadata; use v1::metadata::Metadata;
use v1::traits::EthPubSub; use v1::traits::EthPubSub;
use v1::types::{pubsub, RichHeader}; use v1::types::{pubsub, RichHeader, Log};
use ethcore::encoded; use ethcore::encoded;
use ethcore::filter::Filter as EthFilter;
use ethcore::client::{BlockChainClient, ChainNotify, BlockId}; use ethcore::client::{BlockChainClient, ChainNotify, BlockId};
use ethsync::LightSync;
use light::cache::Cache;
use light::on_demand::OnDemand;
use light::client::{LightChainClient, LightChainNotify}; use light::client::{LightChainClient, LightChainNotify};
use parity_reactor::Remote; use parity_reactor::Remote;
use util::{Mutex, H256, Bytes}; use util::{RwLock, Mutex, H256, Bytes};
type Client = Sink<pubsub::Result>;
/// Eth PubSub implementation. /// Eth PubSub implementation.
pub struct EthPubSubClient<C> { pub struct EthPubSubClient<C> {
handler: Arc<ChainNotificationHandler<C>>, handler: Arc<ChainNotificationHandler<C>>,
heads_subscribers: Arc<Mutex<Subscribers<Sink<pubsub::Result>>>>, heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
logs_subscribers: Arc<RwLock<Subscribers<(Client, EthFilter)>>>,
} }
impl<C> EthPubSubClient<C> { impl<C> EthPubSubClient<C> {
/// Creates new `EthPubSubClient`. /// Creates new `EthPubSubClient`.
pub fn new(client: Arc<C>, remote: Remote) -> Self { pub fn new(client: Arc<C>, remote: Remote) -> Self {
let heads_subscribers = Arc::new(Mutex::new(Subscribers::default())); let heads_subscribers = Arc::new(RwLock::new(Subscribers::default()));
let logs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
EthPubSubClient { EthPubSubClient {
handler: Arc::new(ChainNotificationHandler { handler: Arc::new(ChainNotificationHandler {
client, client,
remote, remote,
heads_subscribers: heads_subscribers.clone(), heads_subscribers: heads_subscribers.clone(),
logs_subscribers: logs_subscribers.clone(),
}), }),
heads_subscribers, heads_subscribers,
logs_subscribers,
} }
} }
@ -60,7 +71,8 @@ impl<C> EthPubSubClient<C> {
#[cfg(test)] #[cfg(test)]
pub fn new_test(client: Arc<C>, remote: Remote) -> Self { pub fn new_test(client: Arc<C>, remote: Remote) -> Self {
let client = Self::new(client, remote); let client = Self::new(client, remote);
*client.heads_subscribers.lock() = Subscribers::new_test(); *client.heads_subscribers.write() = Subscribers::new_test();
*client.logs_subscribers.write() = Subscribers::new_test();
client client
} }
@ -70,42 +82,116 @@ impl<C> EthPubSubClient<C> {
} }
} }
impl EthPubSubClient<LightFetch> {
/// Creates a new `EthPubSubClient` for `LightClient`.
pub fn light(
client: Arc<LightChainClient>,
on_demand: Arc<OnDemand>,
sync: Arc<LightSync>,
cache: Arc<Mutex<Cache>>,
remote: Remote,
) -> Self {
let fetch = LightFetch {
client,
on_demand,
sync,
cache
};
EthPubSubClient::new(Arc::new(fetch), remote)
}
}
/// PubSub Notification handler. /// PubSub Notification handler.
pub struct ChainNotificationHandler<C> { pub struct ChainNotificationHandler<C> {
client: Arc<C>, client: Arc<C>,
remote: Remote, remote: Remote,
heads_subscribers: Arc<Mutex<Subscribers<Sink<pubsub::Result>>>>, heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
logs_subscribers: Arc<RwLock<Subscribers<(Client, EthFilter)>>>,
} }
impl<C> ChainNotificationHandler<C> { impl<C> ChainNotificationHandler<C> {
fn notify(&self, blocks: Vec<(encoded::Header, BTreeMap<String, String>)>) { fn notify(remote: &Remote, subscriber: &Client, result: pubsub::Result) {
for subscriber in self.heads_subscribers.lock().values() { remote.spawn(subscriber
for &(ref block, ref extra_info) in &blocks { .notify(Ok(result))
self.remote.spawn(subscriber .map(|_| ())
.notify(Ok(pubsub::Result::Header(RichHeader { .map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e))
inner: block.into(), );
extra_info: extra_info.clone(), }
})))
.map(|_| ()) fn notify_heads(&self, headers: &[(encoded::Header, BTreeMap<String, String>)]) {
.map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e)) for subscriber in self.heads_subscribers.read().values() {
); for &(ref header, ref extra_info) in headers {
Self::notify(&self.remote, subscriber, pubsub::Result::Header(RichHeader {
inner: header.into(),
extra_info: extra_info.clone(),
}));
} }
} }
} }
fn notify_logs<F>(&self, enacted: &[H256], logs: F) where
F: Fn(EthFilter) -> BoxFuture<Vec<Log>, Error>,
{
for &(ref subscriber, ref filter) in self.logs_subscribers.read().values() {
let logs = futures::future::join_all(enacted
.iter()
.map(|hash| {
let mut filter = filter.clone();
filter.from_block = BlockId::Hash(*hash);
filter.to_block = filter.from_block.clone();
logs(filter)
})
.collect::<Vec<_>>()
);
let limit = filter.limit;
let remote = self.remote.clone();
let subscriber = subscriber.clone();
self.remote.spawn(logs
.map(move |logs| {
let logs = logs.into_iter().flat_map(|log| log).collect();
let logs = limit_logs(logs, limit);
if !logs.is_empty() {
Self::notify(&remote, &subscriber, pubsub::Result::Logs(logs));
}
})
.map_err(|e| warn!("Unable to fetch latest logs: {:?}", e))
);
}
}
} }
impl<C: LightChainClient> LightChainNotify for ChainNotificationHandler<C> { /// A light client wrapper struct.
pub trait LightClient: Send + Sync {
/// Get a recent block header.
fn block_header(&self, id: BlockId) -> Option<encoded::Header>;
/// Fetch logs.
fn logs(&self, filter: EthFilter) -> BoxFuture<Vec<Log>, Error>;
}
impl LightClient for LightFetch {
fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
self.client.block_header(id)
}
fn logs(&self, filter: EthFilter) -> BoxFuture<Vec<Log>, Error> {
LightFetch::logs(self, filter)
}
}
impl<C: LightClient> LightChainNotify for ChainNotificationHandler<C> {
fn new_headers( fn new_headers(
&self, &self,
headers: &[H256], enacted: &[H256],
) { ) {
let blocks = headers let headers = enacted
.iter() .iter()
.filter_map(|hash| self.client.block_header(BlockId::Hash(*hash))) .filter_map(|hash| self.client.block_header(BlockId::Hash(*hash)))
.map(|header| (header, Default::default())) .map(|header| (header, Default::default()))
.collect(); .collect::<Vec<_>>();
self.notify(blocks); self.notify_heads(&headers);
self.notify_logs(&enacted, |filter| self.client.logs(filter))
} }
} }
@ -115,22 +201,37 @@ impl<C: BlockChainClient> ChainNotify for ChainNotificationHandler<C> {
_imported: Vec<H256>, _imported: Vec<H256>,
_invalid: Vec<H256>, _invalid: Vec<H256>,
enacted: Vec<H256>, enacted: Vec<H256>,
_retracted: Vec<H256>, retracted: Vec<H256>,
_sealed: Vec<H256>, _sealed: Vec<H256>,
// Block bytes. // Block bytes.
_proposed: Vec<Bytes>, _proposed: Vec<Bytes>,
_duration: u64, _duration: u64,
) { ) {
const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed"; const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed";
let blocks = enacted let headers = enacted
.into_iter() .iter()
.filter_map(|hash| self.client.block_header(BlockId::Hash(hash))) .filter_map(|hash| self.client.block_header(BlockId::Hash(*hash)))
.map(|header| { .map(|header| {
let hash = header.hash(); let hash = header.hash();
(header, self.client.block_extra_info(BlockId::Hash(hash)).expect(EXTRA_INFO_PROOF)) (header, self.client.block_extra_info(BlockId::Hash(hash)).expect(EXTRA_INFO_PROOF))
}) })
.collect(); .collect::<Vec<_>>();
self.notify(blocks);
// Headers
self.notify_heads(&headers);
// Enacted logs
self.notify_logs(&enacted, |filter| {
future::ok(self.client.logs(filter).into_iter().map(Into::into).collect()).boxed()
});
// Retracted logs
self.notify_logs(&retracted, |filter| {
future::ok(self.client.logs(filter).into_iter().map(Into::into).map(|mut log: Log| {
log.log_type = "removed".into();
log
}).collect()).boxed()
});
} }
} }
@ -144,10 +245,12 @@ impl<C: Send + Sync + 'static> EthPubSub for EthPubSubClient<C> {
kind: pubsub::Kind, kind: pubsub::Kind,
params: Trailing<pubsub::Params>, params: Trailing<pubsub::Params>,
) { ) {
let params: Option<pubsub::Params> = params.into(); match (kind, params.into()) {
match (kind, params) {
(pubsub::Kind::NewHeads, None) => { (pubsub::Kind::NewHeads, None) => {
self.heads_subscribers.lock().push(subscriber) self.heads_subscribers.write().push(subscriber)
},
(pubsub::Kind::Logs, Some(pubsub::Params::Logs(filter))) => {
self.logs_subscribers.write().push(subscriber, filter.into());
}, },
_ => { _ => {
let _ = subscriber.reject(errors::unimplemented(None)); let _ = subscriber.reject(errors::unimplemented(None));
@ -156,7 +259,9 @@ impl<C: Send + Sync + 'static> EthPubSub for EthPubSubClient<C> {
} }
fn unsubscribe(&self, id: SubscriptionId) -> BoxFuture<bool, Error> { fn unsubscribe(&self, id: SubscriptionId) -> BoxFuture<bool, Error> {
let res = self.heads_subscribers.lock().remove(&id).is_some(); let res = self.heads_subscribers.write().remove(&id).is_some();
futures::future::ok(res).boxed() let res2 = self.logs_subscribers.write().remove(&id).is_some();
future::ok(res || res2).boxed()
} }
} }

View File

@ -477,60 +477,7 @@ impl Filterable for EthClient {
} }
fn logs(&self, filter: EthcoreFilter) -> BoxFuture<Vec<Log>, Error> { fn logs(&self, filter: EthcoreFilter) -> BoxFuture<Vec<Log>, Error> {
use std::collections::BTreeMap; self.fetcher().logs(filter)
use futures::stream::{self, Stream};
use util::H2048;
// early exit for "to" block before "from" block.
let best_number = self.client.chain_info().best_block_number;
let block_number = |id| match id {
BlockId::Earliest => Some(0),
BlockId::Latest | BlockId::Pending => Some(best_number),
BlockId::Hash(h) => self.client.block_header(BlockId::Hash(h)).map(|hdr| hdr.number()),
BlockId::Number(x) => Some(x),
};
match (block_number(filter.to_block), block_number(filter.from_block)) {
(Some(to), Some(from)) if to < from => return future::ok(Vec::new()).boxed(),
(Some(_), Some(_)) => {},
_ => return future::err(errors::unknown_block()).boxed(),
}
let maybe_future = self.sync.with_context(move |ctx| {
// find all headers which match the filter, and fetch the receipts for each one.
// match them with their numbers for easy sorting later.
let bit_combos = filter.bloom_possibilities();
let receipts_futures: Vec<_> = self.client.ancestry_iter(filter.to_block)
.take_while(|ref hdr| BlockId::Number(hdr.number()) != filter.from_block)
.take_while(|ref hdr| BlockId::Hash(hdr.hash()) != filter.from_block)
.filter(|ref hdr| {
let hdr_bloom = hdr.log_bloom();
bit_combos.iter().find(|&bloom| hdr_bloom & *bloom == *bloom).is_some()
})
.map(|hdr| (hdr.number(), request::BlockReceipts(hdr.into())))
.map(|(num, req)| self.on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS).map(move |x| (num, x)))
.collect();
// as the receipts come in, find logs within them which match the filter.
// insert them into a BTreeMap to maintain order by number and block index.
stream::futures_unordered(receipts_futures)
.fold(BTreeMap::new(), move |mut matches, (num, receipts)| {
for (block_index, log) in receipts.into_iter().flat_map(|r| r.logs).enumerate() {
if filter.matches(&log) {
matches.insert((num, block_index), log.into());
}
}
future::ok(matches)
}) // and then collect them into a vector.
.map(|matches| matches.into_iter().map(|(_, v)| v).collect())
.map_err(errors::on_demand_cancel)
});
match maybe_future {
Some(fut) => fut.boxed(),
None => future::err(errors::network_disabled()).boxed(),
}
} }
fn pending_logs(&self, _block_number: u64, _filter: &EthcoreFilter) -> Vec<Log> { fn pending_logs(&self, _block_number: u64, _filter: &EthcoreFilter) -> Vec<Log> {

View File

@ -78,6 +78,78 @@ fn should_subscribe_to_new_heads() {
assert_eq!(res, None); assert_eq!(res, None);
} }
#[test]
fn should_subscribe_to_logs() {
use ethcore::log_entry::{LocalizedLogEntry, LogEntry};
use ethcore::ids::BlockId;
use ethcore::client::BlockChainClient;
// given
let el = EventLoop::spawn();
let mut client = TestBlockChainClient::new();
// Insert some blocks
client.add_blocks(1, EachBlockWith::Transaction);
let h1 = client.block_hash_delta_minus(1);
let block = client.block(BlockId::Hash(h1)).unwrap();
let tx_hash = block.transactions()[0].hash();
client.set_logs(vec![
LocalizedLogEntry {
entry: LogEntry {
address: 5.into(),
topics: vec![1.into(), 2.into(), 0.into(), 0.into()],
data: vec![],
},
block_hash: h1,
block_number: block.header().number(),
transaction_hash: tx_hash,
transaction_index: 0,
log_index: 0,
transaction_log_index: 0,
}
]);
let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote());
let handler = pubsub.handler();
let pubsub = pubsub.to_delegate();
let mut io = MetaIoHandler::default();
io.extend_with(pubsub);
let mut metadata = Metadata::default();
let (sender, receiver) = futures::sync::mpsc::channel(8);
metadata.session = Some(Arc::new(Session::new(sender)));
// Subscribe
let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["logs", {}], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":"0x416d77337e24399d","id":1}"#;
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
// Check notifications (enacted)
handler.new_blocks(vec![], vec![], vec![h1], vec![], vec![], vec![], 0);
let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":[{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
+ &format!("0x{:?}", tx_hash)
+ r#"","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"}],"subscription":"0x416d77337e24399d"}}"#;
assert_eq!(res, Some(response.into()));
// Check notifications (retracted)
handler.new_blocks(vec![], vec![], vec![], vec![h1], vec![], vec![], 0);
let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":[{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
+ &format!("0x{:?}", tx_hash)
+ r#"","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"removed"}],"subscription":"0x416d77337e24399d"}}"#;
assert_eq!(res, Some(response.into()));
// And unsubscribe
let request = r#"{"jsonrpc": "2.0", "method": "eth_unsubscribe", "params": ["0x416d77337e24399d"], "id": 1}"#;
let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#;
assert_eq!(io.handle_request_sync(request, metadata), Some(response.to_owned()));
let (res, _receiver) = receiver.into_future().wait().unwrap();
assert_eq!(res, None);
}
#[test] #[test]
fn should_return_unimplemented() { fn should_return_unimplemented() {
// given // given
@ -97,8 +169,6 @@ fn should_return_unimplemented() {
let response = r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"This request is not implemented yet. Please create an issue on Github repo."},"id":1}"#; let response = r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"This request is not implemented yet. Please create an issue on Github repo."},"id":1}"#;
let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newPendingTransactions"], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newPendingTransactions"], "id": 1}"#;
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["logs"], "id": 1}"#;
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["syncing"], "id": 1}"#; let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["syncing"], "id": 1}"#;
assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned()));
} }

View File

@ -100,7 +100,7 @@ pub struct Block {
} }
/// Block header representation. /// Block header representation.
#[derive(Debug, Serialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct Header { pub struct Header {
/// Hash of the block /// Hash of the block
pub hash: Option<H256>, pub hash: Option<H256>,
@ -186,7 +186,7 @@ pub type RichBlock = Rich<Block>;
pub type RichHeader = Rich<Header>; pub type RichHeader = Rich<Header>;
/// Value representation with additional info /// Value representation with additional info
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct Rich<T> { pub struct Rich<T> {
/// Standard value. /// Standard value.
pub inner: T, pub inner: T,

View File

@ -16,14 +16,18 @@
//! Pub-Sub types. //! Pub-Sub types.
use serde::{Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
use v1::types::{RichHeader, Filter}; use serde::de::Error;
use serde_json::{Value, from_value};
use v1::types::{RichHeader, Filter, Log};
/// Subscription result. /// Subscription result.
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum Result { pub enum Result {
/// New block header. /// New block header.
Header(RichHeader), Header(RichHeader),
/// Logs
Logs(Vec<Log>),
} }
impl Serialize for Result { impl Serialize for Result {
@ -32,6 +36,7 @@ impl Serialize for Result {
{ {
match *self { match *self {
Result::Header(ref header) => header.serialize(serializer), Result::Header(ref header) => header.serialize(serializer),
Result::Logs(ref logs) => logs.serialize(serializer),
} }
} }
} }
@ -55,8 +60,7 @@ pub enum Kind {
} }
/// Subscription kind. /// Subscription kind.
#[derive(Debug, Deserialize, PartialEq, Eq, Hash, Clone)] #[derive(Debug, PartialEq, Eq, Hash, Clone)]
#[serde(deny_unknown_fields)]
pub enum Params { pub enum Params {
/// No parameters passed. /// No parameters passed.
None, None,
@ -70,11 +74,26 @@ impl Default for Params {
} }
} }
impl Deserialize for Params {
fn deserialize<D>(deserializer: D) -> ::std::result::Result<Params, D::Error>
where D: Deserializer {
let v: Value = Deserialize::deserialize(deserializer)?;
if v.is_null() {
return Ok(Params::None);
}
from_value(v.clone()).map(Params::Logs)
.map_err(|_| D::Error::custom("Invalid type."))
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use serde_json; use serde_json;
use super::{Result, Kind}; use super::{Result, Kind, Params};
use v1::types::{RichHeader, Header}; use v1::types::{RichHeader, Header, Filter};
use v1::types::filter::VariadicValue;
#[test] #[test]
fn should_deserialize_kind() { fn should_deserialize_kind() {
@ -84,6 +103,41 @@ mod tests {
assert_eq!(serde_json::from_str::<Kind>(r#""syncing""#).unwrap(), Kind::Syncing); assert_eq!(serde_json::from_str::<Kind>(r#""syncing""#).unwrap(), Kind::Syncing);
} }
#[test]
fn should_deserialize_logs() {
let none = serde_json::from_str::<Params>(r#"null"#).unwrap();
assert_eq!(none, Params::None);
let logs1 = serde_json::from_str::<Params>(r#"{}"#).unwrap();
let logs2 = serde_json::from_str::<Params>(r#"{"limit":10}"#).unwrap();
let logs3 = serde_json::from_str::<Params>(
r#"{"topics":["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b"]}"#
).unwrap();
assert_eq!(logs1, Params::Logs(Filter {
from_block: None,
to_block: None,
address: None,
topics: None,
limit: None,
}));
assert_eq!(logs2, Params::Logs(Filter {
from_block: None,
to_block: None,
address: None,
topics: None,
limit: Some(10),
}));
assert_eq!(logs3, Params::Logs(Filter {
from_block: None,
to_block: None,
address: None,
topics: Some(vec![
VariadicValue::Single("000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b".parse().unwrap()
)]),
limit: None,
}));
}
#[test] #[test]
fn should_serialize_header() { fn should_serialize_header() {
let header = Result::Header(RichHeader { let header = Result::Header(RichHeader {