diff --git a/Cargo.lock b/Cargo.lock index 0003098a8..03b8092a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -227,6 +227,7 @@ dependencies = [ "serde_codegen 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "syntex 0.29.0 (registry+https://github.com/rust-lang/crates.io-index)", + "transient-hashmap 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -821,6 +822,14 @@ name = "traitobject" version = "0.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "transient-hashmap" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "typeable" version = "0.1.2" diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index 394c16e85..e529f50af 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -466,7 +466,8 @@ impl BlockChain { let mut write_details = self.block_details.write().unwrap(); for (hash, details) in update.block_details.into_iter() { batch.put_extras(&hash, &details); - write_details.insert(hash, details); + write_details.insert(hash.clone(), details); + self.note_used(CacheID::Extras(ExtrasIndex::BlockDetails, hash)); } let mut write_receipts = self.block_receipts.write().unwrap(); @@ -802,6 +803,14 @@ impl BlockChain { // TODO: handle block_hashes properly. block_hashes.clear(); + + blocks.shrink_to_fit(); + block_details.shrink_to_fit(); + block_hashes.shrink_to_fit(); + transaction_addresses.shrink_to_fit(); + block_logs.shrink_to_fit(); + blocks_blooms.shrink_to_fit(); + block_receipts.shrink_to_fit(); } if self.cache_size().total() < self.max_cache_size { break; } } diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index bcddec85a..6deb7ce80 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -123,6 +123,9 @@ pub trait BlockChainClient : Sync + Send { /// Get block total difficulty. fn block_total_difficulty(&self, id: BlockId) -> Option; + /// Get block hash. + fn block_hash(&self, id: BlockId) -> Option; + /// Get address code. fn code(&self, address: &Address) -> Option; @@ -540,6 +543,11 @@ impl BlockChainClient for Client { Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) } + fn block_hash(&self, id: BlockId) -> Option { + let chain = self.chain.read().unwrap(); + Self::block_hash(&chain, id) + } + fn code(&self, address: &Address) -> Option { self.state().code(address) } diff --git a/ethcore/src/filter.rs b/ethcore/src/filter.rs index 95c5687a7..9bfebf52f 100644 --- a/ethcore/src/filter.rs +++ b/ethcore/src/filter.rs @@ -42,6 +42,22 @@ pub struct Filter { pub topics: [Option>; 4], } +impl Clone for Filter { + fn clone(&self) -> Self { + let mut topics = [None, None, None, None]; + for i in 0..4 { + topics[i] = self.topics[i].clone(); + } + + Filter { + from_block: self.from_block.clone(), + to_block: self.to_block.clone(), + address: self.address.clone(), + topics: topics + } + } +} + impl Filter { /// Returns combinations of each address and topic. pub fn bloom_possibilities(&self) -> Vec { diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index 65c9d7358..001d1729b 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -108,6 +108,25 @@ fn can_collect_garbage() { assert!(client.blockchain_cache_info().blocks < 100 * 1024); } +#[test] +fn can_handle_long_fork() { + let client_result = generate_dummy_client(1200); + let client = client_result.reference(); + for _ in 0..10 { + client.import_verified_blocks(&IoChannel::disconnected()); + } + assert_eq!(1200, client.chain_info().best_block_number); + + push_blocks_to_client(client, 45, 1201, 800); + push_blocks_to_client(client, 49, 1201, 800); + push_blocks_to_client(client, 53, 1201, 600); + + for _ in 0..20 { + client.import_verified_blocks(&IoChannel::disconnected()); + } + assert_eq!(2000, client.chain_info().best_block_number); +} + #[test] fn can_mine() { let dummy_blocks = get_good_dummy_block_seq(2); @@ -122,7 +141,7 @@ fn can_mine() { b.hash() } None => { panic!(); } - } + } }; assert!(client.submit_seal(pow_hash, vec![]).is_ok()); -} \ No newline at end of file +} diff --git a/ethcore/src/tests/helpers.rs b/ethcore/src/tests/helpers.rs index 44ad667b9..bb9a44614 100644 --- a/ethcore/src/tests/helpers.rs +++ b/ethcore/src/tests/helpers.rs @@ -156,10 +156,9 @@ pub fn generate_dummy_client(block_number: u32) -> GuardedTempResult rolling_block_number = rolling_block_number + 1; rolling_timestamp = rolling_timestamp + 10; - if let Err(_) = client.import_block(create_test_block(&header)) { - panic!("error importing block which is valid by definition"); + if let Err(e) = client.import_block(create_test_block(&header)) { + panic!("error importing block which is valid by definition: {:?}", e); } - } client.flush_queue(); client.import_verified_blocks(&IoChannel::disconnected()); @@ -170,6 +169,34 @@ pub fn generate_dummy_client(block_number: u32) -> GuardedTempResult } } +pub fn push_blocks_to_client(client: &Arc, timestamp_salt: u64, starting_number: usize, block_number: usize) { + let test_spec = get_test_spec(); + let test_engine = test_spec.to_engine().unwrap(); + let state_root = test_engine.spec().genesis_header().state_root; + let mut rolling_hash = client.chain_info().best_block_hash; + let mut rolling_block_number = starting_number as u64; + let mut rolling_timestamp = timestamp_salt + starting_number as u64 * 10; + + for _ in 0..block_number { + let mut header = Header::new(); + + header.gas_limit = decode(test_engine.spec().engine_params.get("minGasLimit").unwrap()); + header.difficulty = decode(test_engine.spec().engine_params.get("minimumDifficulty").unwrap()); + header.timestamp = rolling_timestamp; + header.number = rolling_block_number; + header.parent_hash = rolling_hash; + header.state_root = state_root.clone(); + + rolling_hash = header.hash(); + rolling_block_number = rolling_block_number + 1; + rolling_timestamp = rolling_timestamp + 10; + + if let Err(e) = client.import_block(create_test_block(&header)) { + panic!("error importing block which is valid by definition: {:?}", e); + } + } +} + pub fn get_test_client_with_blocks(blocks: Vec) -> GuardedTempResult> { let dir = RandomTempPath::new(); let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()).unwrap(); @@ -253,18 +280,29 @@ pub fn get_temp_state_in(path: &Path) -> State { pub fn get_good_dummy_block_seq(count: usize) -> Vec { let test_spec = get_test_spec(); let test_engine = test_spec.to_engine().unwrap(); - let mut parent = test_engine.spec().genesis_header().hash(); + get_good_dummy_block_fork_seq(1, count, &test_engine.spec().genesis_header().hash()) +} + +pub fn get_good_dummy_block_fork_seq(start_number: usize, count: usize, parent_hash: &H256) -> Vec { + let test_spec = get_test_spec(); + let test_engine = test_spec.to_engine().unwrap(); + let mut rolling_timestamp = start_number as u64 * 10; + let mut parent = *parent_hash; let mut r = Vec::new(); - for i in 1 .. count + 1 { + for i in start_number .. start_number + count + 1 { let mut block_header = Header::new(); block_header.gas_limit = decode(test_engine.spec().engine_params.get("minGasLimit").unwrap()); - block_header.difficulty = decode(test_engine.spec().engine_params.get("minimumDifficulty").unwrap()); - block_header.timestamp = i as u64; + block_header.difficulty = U256::from(i).mul(U256([0, 1, 0, 0])); + block_header.timestamp = rolling_timestamp; block_header.number = i as u64; block_header.parent_hash = parent; block_header.state_root = test_engine.spec().genesis_header().state_root; + parent = block_header.hash(); + rolling_timestamp = rolling_timestamp + 10; + r.push(create_test_block(&block_header)); + } r } diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 086fb19c1..bfdf8f2d3 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -20,6 +20,7 @@ ethash = { path = "../ethash" } ethsync = { path = "../sync" } clippy = { version = "0.0.44", optional = true } rustc-serialize = "0.3" +transient-hashmap = "0.1" serde_macros = { version = "0.7.0", optional = true } [build-dependencies] diff --git a/rpc/build.rs b/rpc/build.rs index fe1a55694..b5adeaba1 100644 --- a/rpc/build.rs +++ b/rpc/build.rs @@ -9,8 +9,8 @@ mod inner { pub fn main() { let out_dir = env::var_os("OUT_DIR").unwrap(); - let src = Path::new("src/lib.rs.in"); - let dst = Path::new(&out_dir).join("lib.rs"); + let src = Path::new("src/v1/types/mod.rs.in"); + let dst = Path::new(&out_dir).join("mod.rs"); let mut registry = syntex::Registry::new(); diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 8dd58d0b8..0653a0c33 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -27,9 +27,35 @@ extern crate jsonrpc_http_server; extern crate ethcore_util as util; extern crate ethcore; extern crate ethsync; +extern crate transient_hashmap; -#[cfg(feature = "serde_macros")] -include!("lib.rs.in"); +use self::jsonrpc_core::{IoHandler, IoDelegate}; -#[cfg(not(feature = "serde_macros"))] -include!(concat!(env!("OUT_DIR"), "/lib.rs")); +pub mod v1; + +/// Http server. +pub struct HttpServer { + handler: IoHandler, + threads: usize +} + +impl HttpServer { + /// Construct new http server object with given number of threads. + pub fn new(threads: usize) -> HttpServer { + HttpServer { + handler: IoHandler::new(), + threads: threads + } + } + + /// Add io delegate. + pub fn add_delegate(&mut self, delegate: IoDelegate) where D: Send + Sync + 'static { + self.handler.add_delegate(delegate); + } + + /// Start server asynchronously in new thread + pub fn start_async(self, addr: &str, cors_domain: &str) { + let server = jsonrpc_http_server::Server::new(self.handler, self.threads); + server.start_async(addr, jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain.to_owned())) + } +} diff --git a/rpc/src/lib.rs.in b/rpc/src/lib.rs.in deleted file mode 100644 index e17f2b3bb..000000000 --- a/rpc/src/lib.rs.in +++ /dev/null @@ -1,30 +0,0 @@ -use self::jsonrpc_core::{IoHandler, IoDelegate}; - -pub mod v1; - -/// Http server. -pub struct HttpServer { - handler: IoHandler, - threads: usize -} - -impl HttpServer { - /// Construct new http server object with given number of threads. - pub fn new(threads: usize) -> HttpServer { - HttpServer { - handler: IoHandler::new(), - threads: threads - } - } - - /// Add io delegate. - pub fn add_delegate(&mut self, delegate: IoDelegate) where D: Send + Sync + 'static { - self.handler.add_delegate(delegate); - } - - /// Start server asynchronously in new thread - pub fn start_async(self, addr: &str, cors_domain: &str) { - let server = jsonrpc_http_server::Server::new(self.handler, self.threads); - server.start_async(addr, jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain.to_owned())) - } -} diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs new file mode 100644 index 000000000..b1a5c05ba --- /dev/null +++ b/rpc/src/v1/helpers/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +mod poll_manager; +mod poll_filter; + +pub use self::poll_manager::PollManager; +pub use self::poll_filter::PollFilter; diff --git a/rpc/src/v1/helpers/poll_filter.rs b/rpc/src/v1/helpers/poll_filter.rs new file mode 100644 index 000000000..465290270 --- /dev/null +++ b/rpc/src/v1/helpers/poll_filter.rs @@ -0,0 +1,10 @@ +//! Helper type with all filter possibilities. + +use ethcore::filter::Filter; + +#[derive(Clone)] +pub enum PollFilter { + Block, + PendingTransaction, + Logs(Filter) +} diff --git a/rpc/src/v1/helpers/poll_manager.rs b/rpc/src/v1/helpers/poll_manager.rs new file mode 100644 index 000000000..36a6352c2 --- /dev/null +++ b/rpc/src/v1/helpers/poll_manager.rs @@ -0,0 +1,144 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Indexes all rpc poll requests. + +use transient_hashmap::{TransientHashMap, Timer, StandardTimer}; + +/// Lifetime of poll (in seconds). +const POLL_LIFETIME: u64 = 60; + +pub type PollId = usize; +pub type BlockNumber = u64; + +pub struct PollInfo { + pub filter: F, + pub block_number: BlockNumber +} + +impl Clone for PollInfo where F: Clone { + fn clone(&self) -> Self { + PollInfo { + filter: self.filter.clone(), + block_number: self.block_number.clone() + } + } +} + +/// Indexes all poll requests. +/// +/// Lazily garbage collects unused polls info. +pub struct PollManager where T: Timer { + polls: TransientHashMap, T>, + next_available_id: PollId +} + +impl PollManager { + /// Creates new instance of indexer. + pub fn new() -> Self { + PollManager::new_with_timer(Default::default()) + } +} + +impl PollManager where T: Timer { + pub fn new_with_timer(timer: T) -> Self { + PollManager { + polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer), + next_available_id: 0, + } + } + + /// Returns id which can be used for new poll. + /// + /// Stores information when last poll happend. + pub fn create_poll(&mut self, filter: F, block: BlockNumber) -> PollId { + self.polls.prune(); + let id = self.next_available_id; + self.next_available_id += 1; + self.polls.insert(id, PollInfo { + filter: filter, + block_number: block, + }); + id + } + + /// Updates information when last poll happend. + pub fn update_poll(&mut self, id: &PollId, block: BlockNumber) { + self.polls.prune(); + if let Some(info) = self.polls.get_mut(id) { + info.block_number = block; + } + } + + /// Returns number of block when last poll happend. + pub fn get_poll_info(&mut self, id: &PollId) -> Option<&PollInfo> { + self.polls.prune(); + self.polls.get(id) + } + + /// Removes poll info. + pub fn remove_poll(&mut self, id: &PollId) { + self.polls.remove(id); + } +} + +#[cfg(test)] +mod tests { + use std::cell::RefCell; + use transient_hashmap::Timer; + use v1::helpers::PollManager; + + struct TestTimer<'a> { + time: &'a RefCell, + } + + impl<'a> Timer for TestTimer<'a> { + fn get_time(&self) -> i64 { + *self.time.borrow() + } + } + + #[test] + fn test_poll_indexer() { + let time = RefCell::new(0); + let timer = TestTimer { + time: &time, + }; + + let mut indexer = PollManager::new_with_timer(timer); + assert_eq!(indexer.create_poll(false, 20), 0); + assert_eq!(indexer.create_poll(true, 20), 1); + + *time.borrow_mut() = 10; + indexer.update_poll(&0, 21); + assert_eq!(indexer.get_poll_info(&0).unwrap().filter, false); + assert_eq!(indexer.get_poll_info(&0).unwrap().block_number, 21); + + *time.borrow_mut() = 30; + indexer.update_poll(&1, 23); + assert_eq!(indexer.get_poll_info(&1).unwrap().filter, true); + assert_eq!(indexer.get_poll_info(&1).unwrap().block_number, 23); + + *time.borrow_mut() = 75; + indexer.update_poll(&0, 30); + assert!(indexer.get_poll_info(&0).is_none()); + assert_eq!(indexer.get_poll_info(&1).unwrap().filter, true); + assert_eq!(indexer.get_poll_info(&1).unwrap().block_number, 23); + + indexer.remove_poll(&1); + assert!(indexer.get_poll_info(&1).is_none()); + } +} diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index 71d05053c..2313d5114 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -15,11 +15,12 @@ // along with Parity. If not, see . //! Eth rpc implementation. +use std::collections::HashMap; +use std::sync::{Arc, Weak, Mutex, RwLock}; use ethsync::{EthSync, SyncState}; use jsonrpc_core::*; use util::numbers::*; use util::sha3::*; -use util::standard::{RwLock, HashMap, Arc, Weak}; use util::rlp::encode; use ethcore::client::*; use ethcore::block::{IsBlock}; @@ -29,6 +30,7 @@ use ethcore::ethereum::Ethash; use ethcore::ethereum::denominations::shannon; use v1::traits::{Eth, EthFilter}; use v1::types::{Block, BlockTransactions, BlockNumber, Bytes, SyncStatus, SyncInfo, Transaction, OptionalValue, Index, Filter, Log}; +use v1::helpers::{PollFilter, PollManager}; /// Eth rpc implementation. pub struct EthClient { @@ -255,28 +257,98 @@ impl Eth for EthClient { /// Eth filter rpc implementation. pub struct EthFilterClient { - client: Weak + client: Weak, + polls: Mutex>, } impl EthFilterClient { /// Creates new Eth filter client. pub fn new(client: &Arc) -> Self { EthFilterClient { - client: Arc::downgrade(client) + client: Arc::downgrade(client), + polls: Mutex::new(PollManager::new()) } } } impl EthFilter for EthFilterClient { - fn new_block_filter(&self, _params: Params) -> Result { - Ok(Value::U64(0)) + fn new_filter(&self, params: Params) -> Result { + from_params::<(Filter,)>(params) + .and_then(|(filter,)| { + let mut polls = self.polls.lock().unwrap(); + let id = polls.create_poll(PollFilter::Logs(filter.into()), take_weak!(self.client).chain_info().best_block_number); + to_value(&U256::from(id)) + }) } - fn new_pending_transaction_filter(&self, _params: Params) -> Result { - Ok(Value::U64(1)) + fn new_block_filter(&self, params: Params) -> Result { + match params { + Params::None => { + let mut polls = self.polls.lock().unwrap(); + let id = polls.create_poll(PollFilter::Block, take_weak!(self.client).chain_info().best_block_number); + to_value(&U256::from(id)) + }, + _ => Err(Error::invalid_params()) + } } - fn filter_changes(&self, _: Params) -> Result { - to_value(&take_weak!(self.client).chain_info().best_block_hash).map(|v| Value::Array(vec![v])) + fn new_pending_transaction_filter(&self, params: Params) -> Result { + match params { + Params::None => { + let mut polls = self.polls.lock().unwrap(); + let id = polls.create_poll(PollFilter::PendingTransaction, take_weak!(self.client).chain_info().best_block_number); + to_value(&U256::from(id)) + }, + _ => Err(Error::invalid_params()) + } + } + + fn filter_changes(&self, params: Params) -> Result { + let client = take_weak!(self.client); + from_params::<(Index,)>(params) + .and_then(|(index,)| { + let info = self.polls.lock().unwrap().get_poll_info(&index.value()).cloned(); + match info { + None => Ok(Value::Array(vec![] as Vec)), + Some(info) => match info.filter { + PollFilter::Block => { + let current_number = client.chain_info().best_block_number; + let hashes = (info.block_number..current_number).into_iter() + .map(BlockId::Number) + .filter_map(|id| client.block_hash(id)) + .collect::>(); + + self.polls.lock().unwrap().update_poll(&index.value(), current_number); + + to_value(&hashes) + }, + PollFilter::PendingTransaction => { + // TODO: fix implementation once TransactionQueue is merged + to_value(&vec![] as &Vec) + }, + PollFilter::Logs(mut filter) => { + filter.from_block = BlockId::Number(info.block_number); + filter.to_block = BlockId::Latest; + let logs = client.logs(filter) + .into_iter() + .map(From::from) + .collect::>(); + + let current_number = client.chain_info().best_block_number; + self.polls.lock().unwrap().update_poll(&index.value(), current_number); + + to_value(&logs) + } + } + } + }) + } + + fn uninstall_filter(&self, params: Params) -> Result { + from_params::<(Index,)>(params) + .and_then(|(index,)| { + self.polls.lock().unwrap().remove_poll(&index.value()); + to_value(&true) + }) } } diff --git a/rpc/src/v1/impls/web3.rs b/rpc/src/v1/impls/web3.rs index 4d31f73ae..64a82adb9 100644 --- a/rpc/src/v1/impls/web3.rs +++ b/rpc/src/v1/impls/web3.rs @@ -30,9 +30,7 @@ impl Web3Client { impl Web3 for Web3Client { fn client_version(&self, params: Params) -> Result { match params { - Params::None => { - Ok(Value::String(version().to_owned().replace("Parity/", "Parity//"))), - } + Params::None => Ok(Value::String(version().to_owned().replace("Parity/", "Parity//"))), _ => Err(Error::invalid_params()) } } diff --git a/rpc/src/v1/mod.rs b/rpc/src/v1/mod.rs index 01635e872..57f7a944a 100644 --- a/rpc/src/v1/mod.rs +++ b/rpc/src/v1/mod.rs @@ -23,6 +23,7 @@ mod impls; mod types; #[cfg(test)] mod tests; +mod helpers; pub use self::traits::{Web3, Eth, EthFilter, Net}; pub use self::impls::*; diff --git a/rpc/src/v1/types/mod.rs b/rpc/src/v1/types/mod.rs index 34c1f1cff..adf9be071 100644 --- a/rpc/src/v1/types/mod.rs +++ b/rpc/src/v1/types/mod.rs @@ -14,22 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -mod block; -mod block_number; -mod bytes; -mod filter; -mod index; -mod log; -mod optionals; -mod sync; -mod transaction; +#[cfg(feature = "serde_macros")] +include!("mod.rs.in"); -pub use self::block::{Block, BlockTransactions}; -pub use self::block_number::BlockNumber; -pub use self::bytes::Bytes; -pub use self::filter::Filter; -pub use self::index::Index; -pub use self::log::Log; -pub use self::optionals::OptionalValue; -pub use self::sync::{SyncStatus, SyncInfo}; -pub use self::transaction::Transaction; +#[cfg(not(feature = "serde_macros"))] +include!(concat!(env!("OUT_DIR"), "/mod.rs")); diff --git a/rpc/src/v1/types/mod.rs.in b/rpc/src/v1/types/mod.rs.in new file mode 100644 index 000000000..34c1f1cff --- /dev/null +++ b/rpc/src/v1/types/mod.rs.in @@ -0,0 +1,35 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +mod block; +mod block_number; +mod bytes; +mod filter; +mod index; +mod log; +mod optionals; +mod sync; +mod transaction; + +pub use self::block::{Block, BlockTransactions}; +pub use self::block_number::BlockNumber; +pub use self::bytes::Bytes; +pub use self::filter::Filter; +pub use self::index::Index; +pub use self::log::Log; +pub use self::optionals::OptionalValue; +pub use self::sync::{SyncStatus, SyncInfo}; +pub use self::transaction::Transaction; diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 601ca3869..fd690e790 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -635,16 +635,7 @@ impl ChainSync { match self.last_imported_block { None => 0, Some(x) => x } } - /// Find some headers or blocks to download for a peer. - fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) { - self.clear_peer_download(peer_id); - - if io.chain().queue_info().is_full() { - self.pause_sync(); - return; - } - - // check to see if we need to download any block bodies first + fn find_block_bodies_hashes_to_request(&self, ignore_others: bool) -> (Vec, Vec) { let mut needed_bodies: Vec = Vec::new(); let mut needed_numbers: Vec = Vec::new(); @@ -664,74 +655,88 @@ impl ChainSync { } } } + (needed_bodies, needed_numbers) + } + + /// Find some headers or blocks to download for a peer. + fn request_blocks(&mut self, io: &mut SyncIo, peer_id: PeerId, ignore_others: bool) { + self.clear_peer_download(peer_id); + + if io.chain().queue_info().is_full() { + self.pause_sync(); + return; + } + + // check to see if we need to download any block bodies first + let (needed_bodies, needed_numbers) = self.find_block_bodies_hashes_to_request(ignore_others); if !needed_bodies.is_empty() { let (head, _) = self.headers.range_iter().next().unwrap(); if needed_numbers.first().unwrap() - head > self.max_download_ahead_blocks as BlockNumber { trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading block bodies", peer_id, needed_numbers.first().unwrap(), head); self.request_blocks(io, peer_id, true); - return; + } else { + self.downloading_bodies.extend(needed_numbers.iter()); + replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers); + self.request_bodies(io, peer_id, needed_bodies); + } + return; + } + + // check if need to download headers + let mut start = 0; + if !self.have_common_block { + // download backwards until common block is found 1 header at a time + let chain_info = io.chain().chain_info(); + start = chain_info.best_block_number; + if !self.headers.is_empty() { + start = min(start, self.headers.range_iter().next().unwrap().0 - 1); + } + if start == 0 { + self.have_common_block = true; //reached genesis + self.last_imported_hash = Some(chain_info.genesis_hash); + self.last_imported_block = Some(0); + } + } + if self.have_common_block { + let mut headers: Vec = Vec::new(); + let mut prev = self.current_base_block() + 1; + let head = self.headers.range_iter().next().map(|(h, _)| h); + for (next, ref items) in self.headers.range_iter() { + if !headers.is_empty() { + break; + } + if next <= prev { + prev = next + items.len() as BlockNumber; + continue; + } + let mut block = prev; + while block < next && headers.len() < MAX_HEADERS_TO_REQUEST { + if ignore_others || !self.downloading_headers.contains(&(block as BlockNumber)) { + headers.push(block as BlockNumber); + } + block += 1; + } + prev = next + items.len() as BlockNumber; + } + + if !headers.is_empty() { + start = headers[0]; + if head.is_some() && start > head.unwrap() && start - head.unwrap() > self.max_download_ahead_blocks as BlockNumber { + trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading headers", peer_id, start, head.unwrap()); + self.request_blocks(io, peer_id, true); + return; + } + let count = headers.len(); + self.downloading_headers.extend(headers.iter()); + replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers); + assert!(!self.headers.have_item(&start)); + self.request_headers_by_number(io, peer_id, start, count, 0, false); } - self.downloading_bodies.extend(needed_numbers.iter()); - replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, needed_numbers); - self.request_bodies(io, peer_id, needed_bodies); } else { - // check if need to download headers - let mut start = 0; - if !self.have_common_block { - // download backwards until common block is found 1 header at a time - let chain_info = io.chain().chain_info(); - start = chain_info.best_block_number; - if !self.headers.is_empty() { - start = min(start, self.headers.range_iter().next().unwrap().0 - 1); - } - if start == 0 { - self.have_common_block = true; //reached genesis - self.last_imported_hash = Some(chain_info.genesis_hash); - self.last_imported_block = Some(0); - } - } - if self.have_common_block { - let mut headers: Vec = Vec::new(); - let mut prev = self.current_base_block() + 1; - let head = self.headers.range_iter().next().map(|(h, _)| h); - for (next, ref items) in self.headers.range_iter() { - if !headers.is_empty() { - break; - } - if next <= prev { - prev = next + items.len() as BlockNumber; - continue; - } - let mut block = prev; - while block < next && headers.len() < MAX_HEADERS_TO_REQUEST { - if ignore_others || !self.downloading_headers.contains(&(block as BlockNumber)) { - headers.push(block as BlockNumber); - } - block += 1; - } - prev = next + items.len() as BlockNumber; - } - - if !headers.is_empty() { - start = headers[0]; - if head.is_some() && start > head.unwrap() && start - head.unwrap() > self.max_download_ahead_blocks as BlockNumber { - trace!(target: "sync", "{}: Stalled download ({} vs {}), helping with downloading headers", peer_id, start, head.unwrap()); - self.request_blocks(io, peer_id, true); - return; - } - let count = headers.len(); - self.downloading_headers.extend(headers.iter()); - replace(&mut self.peers.get_mut(&peer_id).unwrap().asking_blocks, headers); - assert!(!self.headers.have_item(&start)); - self.request_headers_by_number(io, peer_id, start, count, 0, false); - } - } - else { - // continue search for common block - self.downloading_headers.insert(start); - self.request_headers_by_number(io, peer_id, start, 1, 0, false); - } + // continue search for common block + self.downloading_headers.insert(start); + self.request_headers_by_number(io, peer_id, start, 1, 0, false); } } diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index b788e0c2a..e9c5f0edc 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -105,6 +105,10 @@ impl BlockChainClient for TestBlockChainClient { Some(U256::zero()) } + fn block_hash(&self, id: BlockId) -> Option { + unimplemented!(); + } + fn code(&self, _address: &Address) -> Option { unimplemented!(); } diff --git a/util/bigint/src/uint.rs b/util/bigint/src/uint.rs index 0691ec09b..a70997dc5 100644 --- a/util/bigint/src/uint.rs +++ b/util/bigint/src/uint.rs @@ -2003,6 +2003,7 @@ mod tests { #[test] + #[cfg_attr(feature = "dev", allow(cyclomatic_complexity))] fn u256_multi_full_mul() { let result = U256([0, 0, 0, 0]).full_mul(U256([0, 0, 0, 0])); assert_eq!(U512([0, 0, 0, 0, 0, 0, 0, 0]), result); diff --git a/util/src/table.rs b/util/src/table.rs index f04a498f8..e41209608 100644 --- a/util/src/table.rs +++ b/util/src/table.rs @@ -111,7 +111,7 @@ impl Table /// /// Returns previous value (if any) pub fn insert(&mut self, row: Row, col: Col, val: Val) -> Option { - self.map.entry(row).or_insert_with(|| HashMap::new()).insert(col, val) + self.map.entry(row).or_insert_with(HashMap::new).insert(col, val) } }