From 71c68cc000a09346d691ce723343b4870d93b541 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 28 Jun 2017 12:21:13 +0200 Subject: [PATCH] Logs Pub-Sub (#5705) * Logs subscription. * Add test. * lock -> write --- parity/rpc_apis.rs | 8 +- rpc/src/v1/helpers/light_fetch.rs | 62 ++++++++- rpc/src/v1/helpers/subscribers.rs | 13 +- rpc/src/v1/impls/eth_pubsub.rs | 175 ++++++++++++++++++++------ rpc/src/v1/impls/light/eth.rs | 55 +------- rpc/src/v1/tests/mocked/eth_pubsub.rs | 74 ++++++++++- rpc/src/v1/types/block.rs | 4 +- rpc/src/v1/types/pubsub.rs | 68 ++++++++-- 8 files changed, 356 insertions(+), 103 deletions(-) diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index ec76bf5f4..d1b0253d5 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -443,7 +443,13 @@ impl LightDependencies { } }, Api::EthPubSub => { - let client = EthPubSubClient::new(self.client.clone(), self.remote.clone()); + let client = EthPubSubClient::light( + self.client.clone(), + self.on_demand.clone(), + self.sync.clone(), + self.cache.clone(), + self.remote.clone(), + ); self.client.add_listener( Arc::downgrade(&client.handler()) as Weak<::light::client::LightChainNotify> ); diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index 2655f7351..9ecdffe80 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -22,6 +22,7 @@ use ethcore::basic_account::BasicAccount; use ethcore::encoded; use ethcore::executed::{Executed, ExecutionError}; use ethcore::ids::BlockId; +use ethcore::filter::Filter as EthcoreFilter; use ethcore::transaction::{Action, Transaction as EthTransaction}; use futures::{future, Future, BoxFuture}; @@ -38,7 +39,7 @@ use ethsync::LightSync; use util::{Address, Mutex, U256}; use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch}; -use v1::types::{BlockNumber, CallRequest}; +use v1::types::{BlockNumber, CallRequest, Log}; /// Helper for fetching blockchain data either from the light client or the network /// as necessary. @@ -259,4 +260,63 @@ impl LightFetch { None => future::err(errors::network_disabled()).boxed() } } + + /// get transaction logs + pub fn logs(&self, filter: EthcoreFilter) -> BoxFuture, Error> { + use std::collections::BTreeMap; + + use futures::stream::{self, Stream}; + + const NO_INVALID_BACK_REFS: &'static str = "Fails only on invalid back-references; back-references here known to be valid; qed"; + + // early exit for "to" block before "from" block. + let best_number = self.client.chain_info().best_block_number; + let block_number = |id| match id { + BlockId::Earliest => Some(0), + BlockId::Latest | BlockId::Pending => Some(best_number), + BlockId::Hash(h) => self.client.block_header(BlockId::Hash(h)).map(|hdr| hdr.number()), + BlockId::Number(x) => Some(x), + }; + + match (block_number(filter.to_block), block_number(filter.from_block)) { + (Some(to), Some(from)) if to < from => return future::ok(Vec::new()).boxed(), + (Some(_), Some(_)) => {}, + _ => return future::err(errors::unknown_block()).boxed(), + } + + let maybe_future = self.sync.with_context(move |ctx| { + // find all headers which match the filter, and fetch the receipts for each one. + // match them with their numbers for easy sorting later. + let bit_combos = filter.bloom_possibilities(); + let receipts_futures: Vec<_> = self.client.ancestry_iter(filter.to_block) + .take_while(|ref hdr| BlockId::Number(hdr.number()) != filter.from_block) + .take_while(|ref hdr| BlockId::Hash(hdr.hash()) != filter.from_block) + .filter(|ref hdr| { + let hdr_bloom = hdr.log_bloom(); + bit_combos.iter().find(|&bloom| hdr_bloom & *bloom == *bloom).is_some() + }) + .map(|hdr| (hdr.number(), request::BlockReceipts(hdr.into()))) + .map(|(num, req)| self.on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS).map(move |x| (num, x))) + .collect(); + + // as the receipts come in, find logs within them which match the filter. + // insert them into a BTreeMap to maintain order by number and block index. + stream::futures_unordered(receipts_futures) + .fold(BTreeMap::new(), move |mut matches, (num, receipts)| { + for (block_index, log) in receipts.into_iter().flat_map(|r| r.logs).enumerate() { + if filter.matches(&log) { + matches.insert((num, block_index), log.into()); + } + } + future::ok(matches) + }) // and then collect them into a vector. + .map(|matches| matches.into_iter().map(|(_, v)| v).collect()) + .map_err(errors::on_demand_cancel) + }); + + match maybe_future { + Some(fut) => fut.boxed(), + None => future::err(errors::network_disabled()).boxed(), + } + } } diff --git a/rpc/src/v1/helpers/subscribers.rs b/rpc/src/v1/helpers/subscribers.rs index e1fc42ae4..11dd45d11 100644 --- a/rpc/src/v1/helpers/subscribers.rs +++ b/rpc/src/v1/helpers/subscribers.rs @@ -95,7 +95,7 @@ impl Subscribers { } } -impl Subscribers> { +impl Subscribers> { /// Assigns id and adds a subscriber to the list. pub fn push(&mut self, sub: Subscriber) { let id = self.next_id(); @@ -106,6 +106,17 @@ impl Subscribers> { } } +impl Subscribers<(Sink, V)> { + /// Assigns id and adds a subscriber to the list. + pub fn push(&mut self, sub: Subscriber, val: V) { + let id = self.next_id(); + if let Ok(sink) = sub.assign_id(SubscriptionId::String(id.as_string())) { + debug!(target: "pubsub", "Adding subscription id={:?}", id); + self.subscriptions.insert(id, (sink, val)); + } + } +} + impl ops::Deref for Subscribers { type Target = HashMap; diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index 2e513560c..6ba88be21 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -19,40 +19,51 @@ use std::sync::Arc; use std::collections::BTreeMap; -use futures::{self, BoxFuture, Future}; +use futures::{self, future, BoxFuture, Future}; use jsonrpc_core::Error; use jsonrpc_macros::Trailing; use jsonrpc_macros::pubsub::{Sink, Subscriber}; use jsonrpc_pubsub::SubscriptionId; -use v1::helpers::{errors, Subscribers}; +use v1::helpers::{errors, limit_logs, Subscribers}; +use v1::helpers::light_fetch::LightFetch; use v1::metadata::Metadata; use v1::traits::EthPubSub; -use v1::types::{pubsub, RichHeader}; +use v1::types::{pubsub, RichHeader, Log}; use ethcore::encoded; +use ethcore::filter::Filter as EthFilter; use ethcore::client::{BlockChainClient, ChainNotify, BlockId}; +use ethsync::LightSync; +use light::cache::Cache; +use light::on_demand::OnDemand; use light::client::{LightChainClient, LightChainNotify}; use parity_reactor::Remote; -use util::{Mutex, H256, Bytes}; +use util::{RwLock, Mutex, H256, Bytes}; + +type Client = Sink; /// Eth PubSub implementation. pub struct EthPubSubClient { handler: Arc>, - heads_subscribers: Arc>>>, + heads_subscribers: Arc>>, + logs_subscribers: Arc>>, } impl EthPubSubClient { /// Creates new `EthPubSubClient`. pub fn new(client: Arc, remote: Remote) -> Self { - let heads_subscribers = Arc::new(Mutex::new(Subscribers::default())); + let heads_subscribers = Arc::new(RwLock::new(Subscribers::default())); + let logs_subscribers = Arc::new(RwLock::new(Subscribers::default())); EthPubSubClient { handler: Arc::new(ChainNotificationHandler { client, remote, heads_subscribers: heads_subscribers.clone(), + logs_subscribers: logs_subscribers.clone(), }), heads_subscribers, + logs_subscribers, } } @@ -60,7 +71,8 @@ impl EthPubSubClient { #[cfg(test)] pub fn new_test(client: Arc, remote: Remote) -> Self { let client = Self::new(client, remote); - *client.heads_subscribers.lock() = Subscribers::new_test(); + *client.heads_subscribers.write() = Subscribers::new_test(); + *client.logs_subscribers.write() = Subscribers::new_test(); client } @@ -70,42 +82,116 @@ impl EthPubSubClient { } } +impl EthPubSubClient { + /// Creates a new `EthPubSubClient` for `LightClient`. + pub fn light( + client: Arc, + on_demand: Arc, + sync: Arc, + cache: Arc>, + remote: Remote, + ) -> Self { + let fetch = LightFetch { + client, + on_demand, + sync, + cache + }; + EthPubSubClient::new(Arc::new(fetch), remote) + } +} + /// PubSub Notification handler. pub struct ChainNotificationHandler { client: Arc, remote: Remote, - heads_subscribers: Arc>>>, + heads_subscribers: Arc>>, + logs_subscribers: Arc>>, } impl ChainNotificationHandler { - fn notify(&self, blocks: Vec<(encoded::Header, BTreeMap)>) { - for subscriber in self.heads_subscribers.lock().values() { - for &(ref block, ref extra_info) in &blocks { - self.remote.spawn(subscriber - .notify(Ok(pubsub::Result::Header(RichHeader { - inner: block.into(), - extra_info: extra_info.clone(), - }))) - .map(|_| ()) - .map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e)) - ); + fn notify(remote: &Remote, subscriber: &Client, result: pubsub::Result) { + remote.spawn(subscriber + .notify(Ok(result)) + .map(|_| ()) + .map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e)) + ); + } + + fn notify_heads(&self, headers: &[(encoded::Header, BTreeMap)]) { + for subscriber in self.heads_subscribers.read().values() { + for &(ref header, ref extra_info) in headers { + Self::notify(&self.remote, subscriber, pubsub::Result::Header(RichHeader { + inner: header.into(), + extra_info: extra_info.clone(), + })); } } } + + fn notify_logs(&self, enacted: &[H256], logs: F) where + F: Fn(EthFilter) -> BoxFuture, Error>, + { + for &(ref subscriber, ref filter) in self.logs_subscribers.read().values() { + let logs = futures::future::join_all(enacted + .iter() + .map(|hash| { + let mut filter = filter.clone(); + filter.from_block = BlockId::Hash(*hash); + filter.to_block = filter.from_block.clone(); + logs(filter) + }) + .collect::>() + ); + let limit = filter.limit; + let remote = self.remote.clone(); + let subscriber = subscriber.clone(); + self.remote.spawn(logs + .map(move |logs| { + let logs = logs.into_iter().flat_map(|log| log).collect(); + let logs = limit_logs(logs, limit); + if !logs.is_empty() { + Self::notify(&remote, &subscriber, pubsub::Result::Logs(logs)); + } + }) + .map_err(|e| warn!("Unable to fetch latest logs: {:?}", e)) + ); + } + } } -impl LightChainNotify for ChainNotificationHandler { +/// A light client wrapper struct. +pub trait LightClient: Send + Sync { + /// Get a recent block header. + fn block_header(&self, id: BlockId) -> Option; + + /// Fetch logs. + fn logs(&self, filter: EthFilter) -> BoxFuture, Error>; +} + +impl LightClient for LightFetch { + fn block_header(&self, id: BlockId) -> Option { + self.client.block_header(id) + } + + fn logs(&self, filter: EthFilter) -> BoxFuture, Error> { + LightFetch::logs(self, filter) + } +} + +impl LightChainNotify for ChainNotificationHandler { fn new_headers( &self, - headers: &[H256], + enacted: &[H256], ) { - let blocks = headers + let headers = enacted .iter() .filter_map(|hash| self.client.block_header(BlockId::Hash(*hash))) .map(|header| (header, Default::default())) - .collect(); + .collect::>(); - self.notify(blocks); + self.notify_heads(&headers); + self.notify_logs(&enacted, |filter| self.client.logs(filter)) } } @@ -115,22 +201,37 @@ impl ChainNotify for ChainNotificationHandler { _imported: Vec, _invalid: Vec, enacted: Vec, - _retracted: Vec, + retracted: Vec, _sealed: Vec, // Block bytes. _proposed: Vec, _duration: u64, ) { const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed"; - let blocks = enacted - .into_iter() - .filter_map(|hash| self.client.block_header(BlockId::Hash(hash))) + let headers = enacted + .iter() + .filter_map(|hash| self.client.block_header(BlockId::Hash(*hash))) .map(|header| { let hash = header.hash(); (header, self.client.block_extra_info(BlockId::Hash(hash)).expect(EXTRA_INFO_PROOF)) }) - .collect(); - self.notify(blocks); + .collect::>(); + + // Headers + self.notify_heads(&headers); + + // Enacted logs + self.notify_logs(&enacted, |filter| { + future::ok(self.client.logs(filter).into_iter().map(Into::into).collect()).boxed() + }); + + // Retracted logs + self.notify_logs(&retracted, |filter| { + future::ok(self.client.logs(filter).into_iter().map(Into::into).map(|mut log: Log| { + log.log_type = "removed".into(); + log + }).collect()).boxed() + }); } } @@ -144,10 +245,12 @@ impl EthPubSub for EthPubSubClient { kind: pubsub::Kind, params: Trailing, ) { - let params: Option = params.into(); - match (kind, params) { + match (kind, params.into()) { (pubsub::Kind::NewHeads, None) => { - self.heads_subscribers.lock().push(subscriber) + self.heads_subscribers.write().push(subscriber) + }, + (pubsub::Kind::Logs, Some(pubsub::Params::Logs(filter))) => { + self.logs_subscribers.write().push(subscriber, filter.into()); }, _ => { let _ = subscriber.reject(errors::unimplemented(None)); @@ -156,7 +259,9 @@ impl EthPubSub for EthPubSubClient { } fn unsubscribe(&self, id: SubscriptionId) -> BoxFuture { - let res = self.heads_subscribers.lock().remove(&id).is_some(); - futures::future::ok(res).boxed() + let res = self.heads_subscribers.write().remove(&id).is_some(); + let res2 = self.logs_subscribers.write().remove(&id).is_some(); + + future::ok(res || res2).boxed() } } diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index dac63005e..e133b3e7a 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -477,60 +477,7 @@ impl Filterable for EthClient { } fn logs(&self, filter: EthcoreFilter) -> BoxFuture, Error> { - use std::collections::BTreeMap; - - use futures::stream::{self, Stream}; - use util::H2048; - - // early exit for "to" block before "from" block. - let best_number = self.client.chain_info().best_block_number; - let block_number = |id| match id { - BlockId::Earliest => Some(0), - BlockId::Latest | BlockId::Pending => Some(best_number), - BlockId::Hash(h) => self.client.block_header(BlockId::Hash(h)).map(|hdr| hdr.number()), - BlockId::Number(x) => Some(x), - }; - - match (block_number(filter.to_block), block_number(filter.from_block)) { - (Some(to), Some(from)) if to < from => return future::ok(Vec::new()).boxed(), - (Some(_), Some(_)) => {}, - _ => return future::err(errors::unknown_block()).boxed(), - } - - let maybe_future = self.sync.with_context(move |ctx| { - // find all headers which match the filter, and fetch the receipts for each one. - // match them with their numbers for easy sorting later. - let bit_combos = filter.bloom_possibilities(); - let receipts_futures: Vec<_> = self.client.ancestry_iter(filter.to_block) - .take_while(|ref hdr| BlockId::Number(hdr.number()) != filter.from_block) - .take_while(|ref hdr| BlockId::Hash(hdr.hash()) != filter.from_block) - .filter(|ref hdr| { - let hdr_bloom = hdr.log_bloom(); - bit_combos.iter().find(|&bloom| hdr_bloom & *bloom == *bloom).is_some() - }) - .map(|hdr| (hdr.number(), request::BlockReceipts(hdr.into()))) - .map(|(num, req)| self.on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS).map(move |x| (num, x))) - .collect(); - - // as the receipts come in, find logs within them which match the filter. - // insert them into a BTreeMap to maintain order by number and block index. - stream::futures_unordered(receipts_futures) - .fold(BTreeMap::new(), move |mut matches, (num, receipts)| { - for (block_index, log) in receipts.into_iter().flat_map(|r| r.logs).enumerate() { - if filter.matches(&log) { - matches.insert((num, block_index), log.into()); - } - } - future::ok(matches) - }) // and then collect them into a vector. - .map(|matches| matches.into_iter().map(|(_, v)| v).collect()) - .map_err(errors::on_demand_cancel) - }); - - match maybe_future { - Some(fut) => fut.boxed(), - None => future::err(errors::network_disabled()).boxed(), - } + self.fetcher().logs(filter) } fn pending_logs(&self, _block_number: u64, _filter: &EthcoreFilter) -> Vec { diff --git a/rpc/src/v1/tests/mocked/eth_pubsub.rs b/rpc/src/v1/tests/mocked/eth_pubsub.rs index b31d78796..199830025 100644 --- a/rpc/src/v1/tests/mocked/eth_pubsub.rs +++ b/rpc/src/v1/tests/mocked/eth_pubsub.rs @@ -78,6 +78,78 @@ fn should_subscribe_to_new_heads() { assert_eq!(res, None); } +#[test] +fn should_subscribe_to_logs() { + use ethcore::log_entry::{LocalizedLogEntry, LogEntry}; + use ethcore::ids::BlockId; + use ethcore::client::BlockChainClient; + + // given + let el = EventLoop::spawn(); + let mut client = TestBlockChainClient::new(); + // Insert some blocks + client.add_blocks(1, EachBlockWith::Transaction); + let h1 = client.block_hash_delta_minus(1); + let block = client.block(BlockId::Hash(h1)).unwrap(); + let tx_hash = block.transactions()[0].hash(); + client.set_logs(vec![ + LocalizedLogEntry { + entry: LogEntry { + address: 5.into(), + topics: vec![1.into(), 2.into(), 0.into(), 0.into()], + data: vec![], + }, + block_hash: h1, + block_number: block.header().number(), + transaction_hash: tx_hash, + transaction_index: 0, + log_index: 0, + transaction_log_index: 0, + } + ]); + + let pubsub = EthPubSubClient::new_test(Arc::new(client), el.remote()); + let handler = pubsub.handler(); + let pubsub = pubsub.to_delegate(); + + let mut io = MetaIoHandler::default(); + io.extend_with(pubsub); + + let mut metadata = Metadata::default(); + let (sender, receiver) = futures::sync::mpsc::channel(8); + metadata.session = Some(Arc::new(Session::new(sender))); + + // Subscribe + let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["logs", {}], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":"0x416d77337e24399d","id":1}"#; + assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); + + // Check notifications (enacted) + handler.new_blocks(vec![], vec![], vec![h1], vec![], vec![], vec![], 0); + let (res, receiver) = receiver.into_future().wait().unwrap(); + let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":[{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned() + + &format!("0x{:?}", tx_hash) + + r#"","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"}],"subscription":"0x416d77337e24399d"}}"#; + assert_eq!(res, Some(response.into())); + + // Check notifications (retracted) + handler.new_blocks(vec![], vec![], vec![], vec![h1], vec![], vec![], 0); + let (res, receiver) = receiver.into_future().wait().unwrap(); + let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":[{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned() + + &format!("0x{:?}", tx_hash) + + r#"","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"removed"}],"subscription":"0x416d77337e24399d"}}"#; + assert_eq!(res, Some(response.into())); + + + // And unsubscribe + let request = r#"{"jsonrpc": "2.0", "method": "eth_unsubscribe", "params": ["0x416d77337e24399d"], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","result":true,"id":1}"#; + assert_eq!(io.handle_request_sync(request, metadata), Some(response.to_owned())); + + let (res, _receiver) = receiver.into_future().wait().unwrap(); + assert_eq!(res, None); +} + #[test] fn should_return_unimplemented() { // given @@ -97,8 +169,6 @@ fn should_return_unimplemented() { let response = r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"This request is not implemented yet. Please create an issue on Github repo."},"id":1}"#; let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["newPendingTransactions"], "id": 1}"#; assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); - let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["logs"], "id": 1}"#; - assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); let request = r#"{"jsonrpc": "2.0", "method": "eth_subscribe", "params": ["syncing"], "id": 1}"#; assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); } diff --git a/rpc/src/v1/types/block.rs b/rpc/src/v1/types/block.rs index 50a081356..525d2c090 100644 --- a/rpc/src/v1/types/block.rs +++ b/rpc/src/v1/types/block.rs @@ -100,7 +100,7 @@ pub struct Block { } /// Block header representation. -#[derive(Debug, Serialize, PartialEq, Eq)] +#[derive(Debug, Clone, Serialize, PartialEq, Eq)] pub struct Header { /// Hash of the block pub hash: Option, @@ -186,7 +186,7 @@ pub type RichBlock = Rich; pub type RichHeader = Rich
; /// Value representation with additional info -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct Rich { /// Standard value. pub inner: T, diff --git a/rpc/src/v1/types/pubsub.rs b/rpc/src/v1/types/pubsub.rs index 8bc4f9079..1be0d19e3 100644 --- a/rpc/src/v1/types/pubsub.rs +++ b/rpc/src/v1/types/pubsub.rs @@ -16,14 +16,18 @@ //! Pub-Sub types. -use serde::{Serialize, Serializer}; -use v1::types::{RichHeader, Filter}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use serde::de::Error; +use serde_json::{Value, from_value}; +use v1::types::{RichHeader, Filter, Log}; /// Subscription result. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum Result { /// New block header. Header(RichHeader), + /// Logs + Logs(Vec), } impl Serialize for Result { @@ -32,6 +36,7 @@ impl Serialize for Result { { match *self { Result::Header(ref header) => header.serialize(serializer), + Result::Logs(ref logs) => logs.serialize(serializer), } } } @@ -55,8 +60,7 @@ pub enum Kind { } /// Subscription kind. -#[derive(Debug, Deserialize, PartialEq, Eq, Hash, Clone)] -#[serde(deny_unknown_fields)] +#[derive(Debug, PartialEq, Eq, Hash, Clone)] pub enum Params { /// No parameters passed. None, @@ -70,11 +74,26 @@ impl Default for Params { } } +impl Deserialize for Params { + fn deserialize(deserializer: D) -> ::std::result::Result + where D: Deserializer { + let v: Value = Deserialize::deserialize(deserializer)?; + + if v.is_null() { + return Ok(Params::None); + } + + from_value(v.clone()).map(Params::Logs) + .map_err(|_| D::Error::custom("Invalid type.")) + } +} + #[cfg(test)] mod tests { use serde_json; - use super::{Result, Kind}; - use v1::types::{RichHeader, Header}; + use super::{Result, Kind, Params}; + use v1::types::{RichHeader, Header, Filter}; + use v1::types::filter::VariadicValue; #[test] fn should_deserialize_kind() { @@ -84,6 +103,41 @@ mod tests { assert_eq!(serde_json::from_str::(r#""syncing""#).unwrap(), Kind::Syncing); } + #[test] + fn should_deserialize_logs() { + let none = serde_json::from_str::(r#"null"#).unwrap(); + assert_eq!(none, Params::None); + + let logs1 = serde_json::from_str::(r#"{}"#).unwrap(); + let logs2 = serde_json::from_str::(r#"{"limit":10}"#).unwrap(); + let logs3 = serde_json::from_str::( + r#"{"topics":["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b"]}"# + ).unwrap(); + assert_eq!(logs1, Params::Logs(Filter { + from_block: None, + to_block: None, + address: None, + topics: None, + limit: None, + })); + assert_eq!(logs2, Params::Logs(Filter { + from_block: None, + to_block: None, + address: None, + topics: None, + limit: Some(10), + })); + assert_eq!(logs3, Params::Logs(Filter { + from_block: None, + to_block: None, + address: None, + topics: Some(vec![ + VariadicValue::Single("000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b".parse().unwrap() + )]), + limit: None, + })); + } + #[test] fn should_serialize_header() { let header = Result::Header(RichHeader {