diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index efc8b3f2e..84bcddfd6 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1813,76 +1813,100 @@ impl BlockChainClient for Client { self.engine.additional_params().into_iter().collect() } - fn logs(&self, filter: Filter) -> Vec { - // Wrap the logic inside a closure so that we can take advantage of question mark syntax. - let fetch_logs = || { - let chain = self.chain.read(); + fn logs(&self, filter: Filter) -> Result, BlockId> { + let chain = self.chain.read(); - // First, check whether `filter.from_block` and `filter.to_block` is on the canon chain. If so, we can use the - // optimized version. - let is_canon = |id| { - match id { - // If it is referred by number, then it is always on the canon chain. - &BlockId::Earliest | &BlockId::Latest | &BlockId::Number(_) => true, - // If it is referred by hash, we see whether a hash -> number -> hash conversion gives us the same - // result. - &BlockId::Hash(ref hash) => chain.is_canon(hash), - } - }; - - let blocks = if is_canon(&filter.from_block) && is_canon(&filter.to_block) { - // If we are on the canon chain, use bloom filter to fetch required hashes. - let from = self.block_number_ref(&filter.from_block)?; - let to = self.block_number_ref(&filter.to_block)?; - - chain.blocks_with_bloom(&filter.bloom_possibilities(), from, to) - .into_iter() - .filter_map(|n| chain.block_hash(n)) - .collect::>() - } else { - // Otherwise, we use a slower version that finds a link between from_block and to_block. - let from_hash = Self::block_hash(&chain, filter.from_block)?; - let from_number = chain.block_number(&from_hash)?; - let to_hash = Self::block_hash(&chain, filter.to_block)?; - - let blooms = filter.bloom_possibilities(); - let bloom_match = |header: &encoded::Header| { - blooms.iter().any(|bloom| header.log_bloom().contains_bloom(bloom)) - }; - - let (blocks, last_hash) = { - let mut blocks = Vec::new(); - let mut current_hash = to_hash; - - loop { - let header = chain.block_header_data(¤t_hash)?; - if bloom_match(&header) { - blocks.push(current_hash); - } - - // Stop if `from` block is reached. - if header.number() <= from_number { - break; - } - current_hash = header.parent_hash(); - } - - blocks.reverse(); - (blocks, current_hash) - }; - - // Check if we've actually reached the expected `from` block. - if last_hash != from_hash || blocks.is_empty() { - return None; - } - - blocks - }; - - Some(self.chain.read().logs(blocks, |entry| filter.matches(entry), filter.limit)) + // First, check whether `filter.from_block` and `filter.to_block` is on the canon chain. If so, we can use the + // optimized version. + let is_canon = |id| { + match id { + // If it is referred by number, then it is always on the canon chain. + &BlockId::Earliest | &BlockId::Latest | &BlockId::Number(_) => true, + // If it is referred by hash, we see whether a hash -> number -> hash conversion gives us the same + // result. + &BlockId::Hash(ref hash) => chain.is_canon(hash), + } }; - fetch_logs().unwrap_or_default() + let blocks = if is_canon(&filter.from_block) && is_canon(&filter.to_block) { + // If we are on the canon chain, use bloom filter to fetch required hashes. + // + // If we are sure the block does not exist (where val > best_block_number), then return error. Note that we + // don't need to care about pending blocks here because RPC query sets pending back to latest (or handled + // pending logs themselves). + let from = match self.block_number_ref(&filter.from_block) { + Some(val) if val <= chain.best_block_number() => val, + _ => return Err(filter.from_block.clone()), + }; + let to = match self.block_number_ref(&filter.to_block) { + Some(val) if val <= chain.best_block_number() => val, + _ => return Err(filter.to_block.clone()), + }; + + // If from is greater than to, then the current bloom filter behavior is to just return empty + // result. There's no point to continue here. + if from > to { + return Err(filter.to_block.clone()); + } + + chain.blocks_with_bloom(&filter.bloom_possibilities(), from, to) + .into_iter() + .filter_map(|n| chain.block_hash(n)) + .collect::>() + } else { + // Otherwise, we use a slower version that finds a link between from_block and to_block. + let from_hash = match Self::block_hash(&chain, filter.from_block) { + Some(val) => val, + None => return Err(filter.from_block.clone()), + }; + let from_number = match chain.block_number(&from_hash) { + Some(val) => val, + None => return Err(BlockId::Hash(from_hash)), + }; + let to_hash = match Self::block_hash(&chain, filter.to_block) { + Some(val) => val, + None => return Err(filter.to_block.clone()), + }; + + let blooms = filter.bloom_possibilities(); + let bloom_match = |header: &encoded::Header| { + blooms.iter().any(|bloom| header.log_bloom().contains_bloom(bloom)) + }; + + let (blocks, last_hash) = { + let mut blocks = Vec::new(); + let mut current_hash = to_hash; + + loop { + let header = match chain.block_header_data(¤t_hash) { + Some(val) => val, + None => return Err(BlockId::Hash(current_hash)), + }; + if bloom_match(&header) { + blocks.push(current_hash); + } + + // Stop if `from` block is reached. + if header.number() <= from_number { + break; + } + current_hash = header.parent_hash(); + } + + blocks.reverse(); + (blocks, current_hash) + }; + + // Check if we've actually reached the expected `from` block. + if last_hash != from_hash || blocks.is_empty() { + // In this case, from_hash is the cause (for not matching last_hash). + return Err(BlockId::Hash(from_hash)); + } + + blocks + }; + + Ok(self.chain.read().logs(blocks, |entry| filter.matches(entry), filter.limit)) } fn filter_traces(&self, filter: TraceFilter) -> Option> { diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index f729b15b7..9981f8d2c 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -94,6 +94,8 @@ pub struct TestBlockChainClient { pub receipts: RwLock>, /// Logs pub logs: RwLock>, + /// Should return errors on logs. + pub error_on_logs: RwLock>, /// Block queue size. pub queue_size: AtomicUsize, /// Miner @@ -178,6 +180,7 @@ impl TestBlockChainClient { traces: RwLock::new(None), history: RwLock::new(None), disabled: AtomicBool::new(false), + error_on_logs: RwLock::new(None), }; // insert genesis hash. @@ -233,6 +236,11 @@ impl TestBlockChainClient { *self.logs.write() = logs; } + /// Set return errors on logs. + pub fn set_error_on_logs(&self, val: Option) { + *self.error_on_logs.write() = val; + } + /// Add blocks to test client. pub fn add_blocks(&self, count: usize, with: EachBlockWith) { let len = self.numbers.read().len(); @@ -665,13 +673,18 @@ impl BlockChainClient for TestBlockChainClient { self.receipts.read().get(&id).cloned() } - fn logs(&self, filter: Filter) -> Vec { + fn logs(&self, filter: Filter) -> Result, BlockId> { + match self.error_on_logs.read().as_ref() { + Some(id) => return Err(id.clone()), + None => (), + } + let mut logs = self.logs.read().clone(); let len = logs.len(); - match filter.limit { + Ok(match filter.limit { Some(limit) if limit <= len => logs.split_off(len - limit), _ => logs, - } + }) } fn last_hashes(&self) -> LastHashes { diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 6ccba5e0f..189ca67f4 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -297,8 +297,8 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra /// Get the registrar address, if it exists. fn additional_params(&self) -> BTreeMap; - /// Returns logs matching given filter. - fn logs(&self, filter: Filter) -> Vec; + /// Returns logs matching given filter. If one of the filtering block cannot be found, returns the block id that caused the error. + fn logs(&self, filter: Filter) -> Result, BlockId>; /// Replays a given transaction for inspection. fn replay(&self, t: TransactionId, analytics: CallAnalytics) -> Result; diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index 24801cb57..8f598a6c2 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -150,7 +150,7 @@ fn returns_logs() { address: None, topics: vec![], limit: None, - }); + }).unwrap(); assert_eq!(logs.len(), 0); } @@ -164,7 +164,7 @@ fn returns_logs_with_limit() { address: None, topics: vec![], limit: None, - }); + }).unwrap(); assert_eq!(logs.len(), 0); } diff --git a/rpc/src/v1/helpers/errors.rs b/rpc/src/v1/helpers/errors.rs index 710f7d749..4afd40ff8 100644 --- a/rpc/src/v1/helpers/errors.rs +++ b/rpc/src/v1/helpers/errors.rs @@ -20,6 +20,7 @@ use std::fmt; use ethcore::account_provider::{SignError as AccountError}; use ethcore::error::{Error as EthcoreError, ErrorKind, CallError}; +use ethcore::client::BlockId; use jsonrpc_core::{futures, Error, ErrorCode, Value}; use rlp::DecoderError; use transaction::Error as TransactionError; @@ -422,6 +423,19 @@ pub fn filter_not_found() -> Error { } } +pub fn filter_block_not_found(id: BlockId) -> Error { + Error { + code: ErrorCode::ServerError(codes::UNSUPPORTED_REQUEST), // Specified in EIP-234. + message: "One of the blocks specified in filter (fromBlock, toBlock or blockHash) cannot be found".into(), + data: Some(Value::String(match id { + BlockId::Hash(hash) => format!("0x{:x}", hash), + BlockId::Number(number) => format!("0x{:x}", number), + BlockId::Earliest => "earliest".to_string(), + BlockId::Latest => "latest".to_string(), + })), + } +} + // on-demand sender cancelled. pub fn on_demand_cancel(_cancel: futures::sync::oneshot::Canceled) -> Error { internal("on-demand sender cancelled", "") diff --git a/rpc/src/v1/helpers/poll_filter.rs b/rpc/src/v1/helpers/poll_filter.rs index 19979c814..48df7ca2a 100644 --- a/rpc/src/v1/helpers/poll_filter.rs +++ b/rpc/src/v1/helpers/poll_filter.rs @@ -22,7 +22,8 @@ use std::{ }; use ethereum_types::H256; use parking_lot::Mutex; -use v1::types::{Filter, Log}; +use ethcore::filter::Filter; +use v1::types::Log; pub type BlockNumber = u64; @@ -52,7 +53,13 @@ pub enum PollFilter { /// Hashes of all pending transactions the client knows about. PendingTransaction(BTreeSet), /// Number of From block number, last seen block hash, pending logs and log filter itself. - Logs(BlockNumber, Option, HashSet, Filter) + Logs { + block_number: BlockNumber, + last_block_hash: Option, + previous_logs: HashSet, + filter: Filter, + include_pending: bool, + } } /// Returns only last `n` logs diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 67dc640d8..5c45c440f 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -708,11 +708,17 @@ impl Eth for EthClient< fn logs(&self, filter: Filter) -> BoxFuture> { let include_pending = filter.to_block == Some(BlockNumber::Pending); - let filter: EthcoreFilter = filter.into(); - let mut logs = self.client.logs(filter.clone()) - .into_iter() - .map(From::from) - .collect::>(); + let filter: EthcoreFilter = match filter.try_into() { + Ok(value) => value, + Err(err) => return Box::new(future::err(err)), + }; + let mut logs = match self.client.logs(filter.clone()) { + Ok(logs) => logs + .into_iter() + .map(From::from) + .collect::>(), + Err(id) => return Box::new(future::err(errors::filter_block_not_found(id))), + }; if include_pending { let best_block = self.client.chain_info().best_block_number; diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs index 926439cfc..7bccc46d1 100644 --- a/rpc/src/v1/impls/eth_filter.rs +++ b/rpc/src/v1/impls/eth_filter.rs @@ -92,7 +92,7 @@ impl Filterable for EthFilterClient where } fn logs(&self, filter: EthcoreFilter) -> BoxFuture> { - Box::new(future::ok(self.client.logs(filter).into_iter().map(Into::into).collect())) + Box::new(future::ok(self.client.logs(filter).unwrap_or_default().into_iter().map(Into::into).collect())) } fn pending_logs(&self, block_number: u64, filter: &EthcoreFilter) -> Vec { @@ -125,7 +125,7 @@ impl Filterable for EthFilterClient where filter.from_block = BlockId::Hash(block_hash); filter.to_block = filter.from_block; - self.client.logs(filter).into_iter().map(|log| { + self.client.logs(filter).unwrap_or_default().into_iter().map(|log| { let mut log: Log = log.into(); log.log_type = "removed".into(); log.removed = true; @@ -140,7 +140,13 @@ impl EthFilter for T { fn new_filter(&self, filter: Filter) -> Result { let mut polls = self.polls().lock(); let block_number = self.best_block_number(); - let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs(block_number, None, Default::default(), filter))); + let include_pending = filter.to_block == Some(BlockNumber::Pending); + let filter = filter.try_into()?; + let id = polls.create_poll(SyncPollFilter::new(PollFilter::Logs { + block_number, filter, include_pending, + last_block_hash: None, + previous_logs: Default::default() + })); Ok(id.into()) } @@ -195,15 +201,17 @@ impl EthFilter for T { // return new hashes Either::A(future::ok(FilterChanges::Hashes(new_hashes))) }, - PollFilter::Logs(ref mut block_number, ref mut last_block_hash, ref mut previous_logs, ref filter) => { + PollFilter::Logs { + ref mut block_number, + ref mut last_block_hash, + ref mut previous_logs, + ref filter, + include_pending, + } => { // retrive the current block number let current_number = self.best_block_number(); - // check if we need to check pending hashes - let include_pending = filter.to_block == Some(BlockNumber::Pending); - - // build appropriate filter - let mut filter: EthcoreFilter = filter.clone().into(); + let mut filter = filter.clone(); // retrieve reorg logs let (mut reorg, reorg_len) = last_block_hash.map_or_else(|| (Vec::new(), 0), |h| self.removed_logs(h, &filter)); @@ -250,21 +258,19 @@ impl EthFilter for T { } fn filter_logs(&self, index: Index) -> BoxFuture> { - let filter = { + let (filter, include_pending) = { let mut polls = self.polls().lock(); match polls.poll(&index.value()).and_then(|f| f.modify(|filter| match *filter { - PollFilter::Logs(.., ref filter) => Some(filter.clone()), + PollFilter::Logs { ref filter, include_pending, .. } => + Some((filter.clone(), include_pending)), _ => None, })) { - Some(filter) => filter, + Some((filter, include_pending)) => (filter, include_pending), None => return Box::new(future::err(errors::filter_not_found())), } }; - let include_pending = filter.to_block == Some(BlockNumber::Pending); - let filter: EthcoreFilter = filter.into(); - // fetch pending logs. let pending = if include_pending { let best_block = self.best_block_number(); diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index 9f592b1fa..7961b1d18 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -252,9 +252,9 @@ impl ChainNotify for ChainNotificationHandler { self.notify_logs(route.route(), |filter, ex| { match ex { &ChainRouteType::Enacted => - Ok(self.client.logs(filter).into_iter().map(Into::into).collect()), + Ok(self.client.logs(filter).unwrap_or_default().into_iter().map(Into::into).collect()), &ChainRouteType::Retracted => - Ok(self.client.logs(filter).into_iter().map(Into::into).map(|mut log: Log| { + Ok(self.client.logs(filter).unwrap_or_default().into_iter().map(Into::into).map(|mut log: Log| { log.log_type = "removed".into(); log.removed = true; log @@ -283,8 +283,13 @@ impl EthPubSub for EthPubSubClient { errors::invalid_params("newHeads", "Expected no parameters.") }, (pubsub::Kind::Logs, Some(pubsub::Params::Logs(filter))) => { - self.logs_subscribers.write().push(subscriber, filter.into()); - return; + match filter.try_into() { + Ok(filter) => { + self.logs_subscribers.write().push(subscriber, filter); + return; + }, + Err(err) => err, + } }, (pubsub::Kind::Logs, _) => { errors::invalid_params("logs", "Expected a filter object.") diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index de2170f12..a22fffe27 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -502,8 +502,11 @@ impl Eth for EthClient { fn logs(&self, filter: Filter) -> BoxFuture> { let limit = filter.limit; - Box::new(Filterable::logs(self, filter.into()) - .map(move|logs| limit_logs(logs, limit))) + Box::new( + Filterable::logs(self, match filter.try_into() { + Ok(value) => value, + Err(err) => return Box::new(future::err(err)), + }).map(move |logs| limit_logs(logs, limit))) } fn work(&self, _timeout: Trailing) -> Result { diff --git a/rpc/src/v1/tests/mocked/eth.rs b/rpc/src/v1/tests/mocked/eth.rs index 602621194..33f820f1c 100644 --- a/rpc/src/v1/tests/mocked/eth.rs +++ b/rpc/src/v1/tests/mocked/eth.rs @@ -233,6 +233,15 @@ fn rpc_eth_logs() { assert_eq!(tester.io.handle_request_sync(request3), Some(response3.to_owned())); } +#[test] +fn rpc_eth_logs_error() { + let tester = EthTester::default(); + tester.client.set_error_on_logs(Some(BlockId::Hash(H256::from([5u8].as_ref())))); + let request = r#"{"jsonrpc": "2.0", "method": "eth_getLogs", "params": [{"limit":1,"blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000"}], "id": 1}"#; + let response = r#"{"jsonrpc":"2.0","error":{"code":-32000,"message":"One of the blocks specified in filter (fromBlock, toBlock or blockHash) cannot be found","data":"0x0500000000000000000000000000000000000000000000000000000000000000"},"id":1}"#; + assert_eq!(tester.io.handle_request_sync(request), Some(response.to_owned())); +} + #[test] fn rpc_logs_filter() { let tester = EthTester::default(); diff --git a/rpc/src/v1/types/filter.rs b/rpc/src/v1/types/filter.rs index dd8b823e8..6d3e94c70 100644 --- a/rpc/src/v1/types/filter.rs +++ b/rpc/src/v1/types/filter.rs @@ -17,9 +17,11 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::de::{Error, DeserializeOwned}; use serde_json::{Value, from_value}; +use jsonrpc_core::{Error as RpcError}; use ethcore::filter::Filter as EthFilter; use ethcore::client::BlockId; use v1::types::{BlockNumber, H160, H256, Log}; +use v1::helpers::errors::invalid_params; /// Variadic value #[derive(Debug, PartialEq, Eq, Clone, Hash)] @@ -62,6 +64,9 @@ pub struct Filter { /// To Block #[serde(rename="toBlock")] pub to_block: Option, + /// Block hash + #[serde(rename="blockHash")] + pub block_hash: Option, /// Address pub address: Option, /// Topics @@ -70,17 +75,30 @@ pub struct Filter { pub limit: Option, } -impl Into for Filter { - fn into(self) -> EthFilter { +impl Filter { + pub fn try_into(self) -> Result { + if self.block_hash.is_some() && (self.from_block.is_some() || self.to_block.is_some()) { + return Err(invalid_params("blockHash", "blockHash is mutually exclusive with fromBlock/toBlock")); + } + let num_to_id = |num| match num { BlockNumber::Num(n) => BlockId::Number(n), BlockNumber::Earliest => BlockId::Earliest, BlockNumber::Latest | BlockNumber::Pending => BlockId::Latest, }; - EthFilter { - from_block: self.from_block.map_or_else(|| BlockId::Latest, &num_to_id), - to_block: self.to_block.map_or_else(|| BlockId::Latest, &num_to_id), + let (from_block, to_block) = match self.block_hash { + Some(hash) => { + let hash = hash.into(); + (BlockId::Hash(hash), BlockId::Hash(hash)) + }, + None => + (self.from_block.map_or_else(|| BlockId::Latest, &num_to_id), + self.to_block.map_or_else(|| BlockId::Latest, &num_to_id)), + }; + + Ok(EthFilter { + from_block, to_block, address: self.address.and_then(|address| match address { VariadicValue::Null => None, VariadicValue::Single(a) => Some(vec![a.into()]), @@ -101,7 +119,7 @@ impl Into for Filter { ] }, limit: self.limit, - } + }) } } @@ -157,6 +175,7 @@ mod tests { assert_eq!(deserialized, Filter { from_block: Some(BlockNumber::Earliest), to_block: Some(BlockNumber::Latest), + block_hash: None, address: None, topics: None, limit: None, @@ -168,6 +187,7 @@ mod tests { let filter = Filter { from_block: Some(BlockNumber::Earliest), to_block: Some(BlockNumber::Latest), + block_hash: None, address: Some(VariadicValue::Multiple(vec![])), topics: Some(vec![ VariadicValue::Null, @@ -177,7 +197,7 @@ mod tests { limit: None, }; - let eth_filter: EthFilter = filter.into(); + let eth_filter: EthFilter = filter.try_into().unwrap(); assert_eq!(eth_filter, EthFilter { from_block: BlockId::Earliest, to_block: BlockId::Latest, diff --git a/rpc/src/v1/types/pubsub.rs b/rpc/src/v1/types/pubsub.rs index ea01d6427..db4af4e87 100644 --- a/rpc/src/v1/types/pubsub.rs +++ b/rpc/src/v1/types/pubsub.rs @@ -119,6 +119,7 @@ mod tests { assert_eq!(logs1, Params::Logs(Filter { from_block: None, to_block: None, + block_hash: None, address: None, topics: None, limit: None, @@ -126,6 +127,7 @@ mod tests { assert_eq!(logs2, Params::Logs(Filter { from_block: None, to_block: None, + block_hash: None, address: None, topics: None, limit: Some(10), @@ -133,6 +135,7 @@ mod tests { assert_eq!(logs3, Params::Logs(Filter { from_block: None, to_block: None, + block_hash: None, address: None, topics: Some(vec![ VariadicValue::Single("000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b".parse().unwrap() diff --git a/secret_store/src/listener/service_contract.rs b/secret_store/src/listener/service_contract.rs index daf70cd64..e4d54e0dc 100644 --- a/secret_store/src/listener/service_contract.rs +++ b/secret_store/src/listener/service_contract.rs @@ -283,7 +283,7 @@ impl ServiceContract for OnChainServiceContract { address: Some(vec![address]), topics: vec![Some(mask_topics(&self.mask))], limit: None, - }); + }).unwrap_or_default(); Box::new(request_logs.into_iter() .filter_map(|log| { diff --git a/updater/src/updater.rs b/updater/src/updater.rs index a3ed413c4..9dad11e9a 100644 --- a/updater/src/updater.rs +++ b/updater/src/updater.rs @@ -314,6 +314,7 @@ impl OperationsClient for OperationsContractClient { }; client.logs(filter) + .unwrap_or_default() .iter() .filter_map(|log| { let event = event.parse_log((log.topics.clone(), log.data.clone()).into()).ok()?; @@ -618,7 +619,7 @@ impl Updater