diff --git a/Cargo.lock b/Cargo.lock index 0003098a8..03b8092a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -227,6 +227,7 @@ dependencies = [ "serde_codegen 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "syntex 0.29.0 (registry+https://github.com/rust-lang/crates.io-index)", + "transient-hashmap 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -821,6 +822,14 @@ name = "traitobject" version = "0.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "transient-hashmap" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "typeable" version = "0.1.2" diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index cc5e23869..422f1eaa2 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -123,6 +123,9 @@ pub trait BlockChainClient : Sync + Send { /// Get block total difficulty. fn block_total_difficulty(&self, id: BlockId) -> Option; + /// Get block hash. + fn block_hash(&self, id: BlockId) -> Option; + /// Get address code. fn code(&self, address: &Address) -> Option; @@ -540,6 +543,11 @@ impl BlockChainClient for Client { Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) } + fn block_hash(&self, id: BlockId) -> Option { + let chain = self.chain.read().unwrap(); + Self::block_hash(&chain, id) + } + fn code(&self, address: &Address) -> Option { self.state().code(address) } diff --git a/ethcore/src/filter.rs b/ethcore/src/filter.rs index 95c5687a7..9bfebf52f 100644 --- a/ethcore/src/filter.rs +++ b/ethcore/src/filter.rs @@ -42,6 +42,22 @@ pub struct Filter { pub topics: [Option>; 4], } +impl Clone for Filter { + fn clone(&self) -> Self { + let mut topics = [None, None, None, None]; + for i in 0..4 { + topics[i] = self.topics[i].clone(); + } + + Filter { + from_block: self.from_block.clone(), + to_block: self.to_block.clone(), + address: self.address.clone(), + topics: topics + } + } +} + impl Filter { /// Returns combinations of each address and topic. pub fn bloom_possibilities(&self) -> Vec { diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 086fb19c1..bfdf8f2d3 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -20,6 +20,7 @@ ethash = { path = "../ethash" } ethsync = { path = "../sync" } clippy = { version = "0.0.44", optional = true } rustc-serialize = "0.3" +transient-hashmap = "0.1" serde_macros = { version = "0.7.0", optional = true } [build-dependencies] diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 4559f766c..0653a0c33 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -27,6 +27,7 @@ extern crate jsonrpc_http_server; extern crate ethcore_util as util; extern crate ethcore; extern crate ethsync; +extern crate transient_hashmap; use self::jsonrpc_core::{IoHandler, IoDelegate}; diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs new file mode 100644 index 000000000..b1a5c05ba --- /dev/null +++ b/rpc/src/v1/helpers/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +mod poll_manager; +mod poll_filter; + +pub use self::poll_manager::PollManager; +pub use self::poll_filter::PollFilter; diff --git a/rpc/src/v1/helpers/poll_filter.rs b/rpc/src/v1/helpers/poll_filter.rs new file mode 100644 index 000000000..465290270 --- /dev/null +++ b/rpc/src/v1/helpers/poll_filter.rs @@ -0,0 +1,10 @@ +//! Helper type with all filter possibilities. + +use ethcore::filter::Filter; + +#[derive(Clone)] +pub enum PollFilter { + Block, + PendingTransaction, + Logs(Filter) +} diff --git a/rpc/src/v1/helpers/poll_manager.rs b/rpc/src/v1/helpers/poll_manager.rs new file mode 100644 index 000000000..36a6352c2 --- /dev/null +++ b/rpc/src/v1/helpers/poll_manager.rs @@ -0,0 +1,144 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Indexes all rpc poll requests. + +use transient_hashmap::{TransientHashMap, Timer, StandardTimer}; + +/// Lifetime of poll (in seconds). +const POLL_LIFETIME: u64 = 60; + +pub type PollId = usize; +pub type BlockNumber = u64; + +pub struct PollInfo { + pub filter: F, + pub block_number: BlockNumber +} + +impl Clone for PollInfo where F: Clone { + fn clone(&self) -> Self { + PollInfo { + filter: self.filter.clone(), + block_number: self.block_number.clone() + } + } +} + +/// Indexes all poll requests. +/// +/// Lazily garbage collects unused polls info. +pub struct PollManager where T: Timer { + polls: TransientHashMap, T>, + next_available_id: PollId +} + +impl PollManager { + /// Creates new instance of indexer. + pub fn new() -> Self { + PollManager::new_with_timer(Default::default()) + } +} + +impl PollManager where T: Timer { + pub fn new_with_timer(timer: T) -> Self { + PollManager { + polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer), + next_available_id: 0, + } + } + + /// Returns id which can be used for new poll. + /// + /// Stores information when last poll happend. + pub fn create_poll(&mut self, filter: F, block: BlockNumber) -> PollId { + self.polls.prune(); + let id = self.next_available_id; + self.next_available_id += 1; + self.polls.insert(id, PollInfo { + filter: filter, + block_number: block, + }); + id + } + + /// Updates information when last poll happend. + pub fn update_poll(&mut self, id: &PollId, block: BlockNumber) { + self.polls.prune(); + if let Some(info) = self.polls.get_mut(id) { + info.block_number = block; + } + } + + /// Returns number of block when last poll happend. + pub fn get_poll_info(&mut self, id: &PollId) -> Option<&PollInfo> { + self.polls.prune(); + self.polls.get(id) + } + + /// Removes poll info. + pub fn remove_poll(&mut self, id: &PollId) { + self.polls.remove(id); + } +} + +#[cfg(test)] +mod tests { + use std::cell::RefCell; + use transient_hashmap::Timer; + use v1::helpers::PollManager; + + struct TestTimer<'a> { + time: &'a RefCell, + } + + impl<'a> Timer for TestTimer<'a> { + fn get_time(&self) -> i64 { + *self.time.borrow() + } + } + + #[test] + fn test_poll_indexer() { + let time = RefCell::new(0); + let timer = TestTimer { + time: &time, + }; + + let mut indexer = PollManager::new_with_timer(timer); + assert_eq!(indexer.create_poll(false, 20), 0); + assert_eq!(indexer.create_poll(true, 20), 1); + + *time.borrow_mut() = 10; + indexer.update_poll(&0, 21); + assert_eq!(indexer.get_poll_info(&0).unwrap().filter, false); + assert_eq!(indexer.get_poll_info(&0).unwrap().block_number, 21); + + *time.borrow_mut() = 30; + indexer.update_poll(&1, 23); + assert_eq!(indexer.get_poll_info(&1).unwrap().filter, true); + assert_eq!(indexer.get_poll_info(&1).unwrap().block_number, 23); + + *time.borrow_mut() = 75; + indexer.update_poll(&0, 30); + assert!(indexer.get_poll_info(&0).is_none()); + assert_eq!(indexer.get_poll_info(&1).unwrap().filter, true); + assert_eq!(indexer.get_poll_info(&1).unwrap().block_number, 23); + + indexer.remove_poll(&1); + assert!(indexer.get_poll_info(&1).is_none()); + } +} diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 71d05053c..2313d5114 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -15,11 +15,12 @@ // along with Parity. If not, see . //! Eth rpc implementation. +use std::collections::HashMap; +use std::sync::{Arc, Weak, Mutex, RwLock}; use ethsync::{EthSync, SyncState}; use jsonrpc_core::*; use util::numbers::*; use util::sha3::*; -use util::standard::{RwLock, HashMap, Arc, Weak}; use util::rlp::encode; use ethcore::client::*; use ethcore::block::{IsBlock}; @@ -29,6 +30,7 @@ use ethcore::ethereum::Ethash; use ethcore::ethereum::denominations::shannon; use v1::traits::{Eth, EthFilter}; use v1::types::{Block, BlockTransactions, BlockNumber, Bytes, SyncStatus, SyncInfo, Transaction, OptionalValue, Index, Filter, Log}; +use v1::helpers::{PollFilter, PollManager}; /// Eth rpc implementation. pub struct EthClient { @@ -255,28 +257,98 @@ impl Eth for EthClient { /// Eth filter rpc implementation. pub struct EthFilterClient { - client: Weak + client: Weak, + polls: Mutex>, } impl EthFilterClient { /// Creates new Eth filter client. pub fn new(client: &Arc) -> Self { EthFilterClient { - client: Arc::downgrade(client) + client: Arc::downgrade(client), + polls: Mutex::new(PollManager::new()) } } } impl EthFilter for EthFilterClient { - fn new_block_filter(&self, _params: Params) -> Result { - Ok(Value::U64(0)) + fn new_filter(&self, params: Params) -> Result { + from_params::<(Filter,)>(params) + .and_then(|(filter,)| { + let mut polls = self.polls.lock().unwrap(); + let id = polls.create_poll(PollFilter::Logs(filter.into()), take_weak!(self.client).chain_info().best_block_number); + to_value(&U256::from(id)) + }) } - fn new_pending_transaction_filter(&self, _params: Params) -> Result { - Ok(Value::U64(1)) + fn new_block_filter(&self, params: Params) -> Result { + match params { + Params::None => { + let mut polls = self.polls.lock().unwrap(); + let id = polls.create_poll(PollFilter::Block, take_weak!(self.client).chain_info().best_block_number); + to_value(&U256::from(id)) + }, + _ => Err(Error::invalid_params()) + } } - fn filter_changes(&self, _: Params) -> Result { - to_value(&take_weak!(self.client).chain_info().best_block_hash).map(|v| Value::Array(vec![v])) + fn new_pending_transaction_filter(&self, params: Params) -> Result { + match params { + Params::None => { + let mut polls = self.polls.lock().unwrap(); + let id = polls.create_poll(PollFilter::PendingTransaction, take_weak!(self.client).chain_info().best_block_number); + to_value(&U256::from(id)) + }, + _ => Err(Error::invalid_params()) + } + } + + fn filter_changes(&self, params: Params) -> Result { + let client = take_weak!(self.client); + from_params::<(Index,)>(params) + .and_then(|(index,)| { + let info = self.polls.lock().unwrap().get_poll_info(&index.value()).cloned(); + match info { + None => Ok(Value::Array(vec![] as Vec)), + Some(info) => match info.filter { + PollFilter::Block => { + let current_number = client.chain_info().best_block_number; + let hashes = (info.block_number..current_number).into_iter() + .map(BlockId::Number) + .filter_map(|id| client.block_hash(id)) + .collect::>(); + + self.polls.lock().unwrap().update_poll(&index.value(), current_number); + + to_value(&hashes) + }, + PollFilter::PendingTransaction => { + // TODO: fix implementation once TransactionQueue is merged + to_value(&vec![] as &Vec) + }, + PollFilter::Logs(mut filter) => { + filter.from_block = BlockId::Number(info.block_number); + filter.to_block = BlockId::Latest; + let logs = client.logs(filter) + .into_iter() + .map(From::from) + .collect::>(); + + let current_number = client.chain_info().best_block_number; + self.polls.lock().unwrap().update_poll(&index.value(), current_number); + + to_value(&logs) + } + } + } + }) + } + + fn uninstall_filter(&self, params: Params) -> Result { + from_params::<(Index,)>(params) + .and_then(|(index,)| { + self.polls.lock().unwrap().remove_poll(&index.value()); + to_value(&true) + }) } } diff --git a/rpc/src/v1/mod.rs b/rpc/src/v1/mod.rs index 01635e872..57f7a944a 100644 --- a/rpc/src/v1/mod.rs +++ b/rpc/src/v1/mod.rs @@ -23,6 +23,7 @@ mod impls; mod types; #[cfg(test)] mod tests; +mod helpers; pub use self::traits::{Web3, Eth, EthFilter, Net}; pub use self::impls::*; diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index b788e0c2a..e9c5f0edc 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -105,6 +105,10 @@ impl BlockChainClient for TestBlockChainClient { Some(U256::zero()) } + fn block_hash(&self, id: BlockId) -> Option { + unimplemented!(); + } + fn code(&self, _address: &Address) -> Option { unimplemented!(); }