diff --git a/.travis.yml b/.travis.yml index 8d2349dae..7213b8f09 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,8 @@ matrix: allow_failures: - rust: nightly include: + - rust: stable + env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" - rust: beta env: FEATURES="--features travis-beta" KCOV_FEATURES="" TARGETS="-p ethash -p ethcore-util -p ethcore -p ethsync -p ethcore-rpc -p parity" ARCHIVE_SUFFIX="-${TRAVIS_OS_NAME}-${TRAVIS_TAG}" - rust: nightly @@ -52,7 +54,7 @@ after_success: | ./kcov-master/tmp/usr/local/bin/kcov --coveralls-id=${TRAVIS_JOB_ID} --exclude-pattern /usr/,/.cargo,/root/.multirust target/kcov target/debug/parity-* && [ $TRAVIS_BRANCH = master ] && [ $TRAVIS_PULL_REQUEST = false ] && - [ $TRAVIS_RUST_VERSION = beta ] && + [ $TRAVIS_RUST_VERSION = stable ] && cargo doc --no-deps --verbose ${KCOV_FEATURES} ${TARGETS} && echo '' > target/doc/index.html && pip install --user ghp-import && diff --git a/Cargo.lock b/Cargo.lock index bd59e41fe..510e69b59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,6 +81,15 @@ name = "cfg-if" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "chrono" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num 0.1.31 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "clippy" version = "0.0.44" @@ -235,6 +244,7 @@ dependencies = [ "serde_codegen 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "syntex 0.29.0 (registry+https://github.com/rust-lang/crates.io-index)", + "transient-hashmap 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -243,6 +253,7 @@ version = "0.9.99" dependencies = [ "arrayvec 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", "bigint 0.1.0", + "chrono 0.2.19 (registry+https://github.com/rust-lang/crates.io-index)", "clippy 0.0.44 (registry+https://github.com/rust-lang/crates.io-index)", "crossbeam 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", "elastic-array 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -841,6 +852,14 @@ name = "traitobject" version = "0.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "transient-hashmap" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "typeable" version = "0.1.2" diff --git a/ethcore/src/block.rs b/ethcore/src/block.rs index f5788baba..68f647e37 100644 --- a/ethcore/src/block.rs +++ b/ethcore/src/block.rs @@ -220,8 +220,8 @@ impl<'x> OpenBlock<'x> { /// NOTE Will check chain constraints and the uncle number but will NOT check /// that the header itself is actually valid. pub fn push_uncle(&mut self, valid_uncle_header: Header) -> Result<(), BlockError> { - if self.block.base.uncles.len() >= self.engine.maximum_uncle_count() { - return Err(BlockError::TooManyUncles(OutOfBounds{min: None, max: Some(self.engine.maximum_uncle_count()), found: self.block.base.uncles.len()})); + if self.block.base.uncles.len() + 1 > self.engine.maximum_uncle_count() { + return Err(BlockError::TooManyUncles(OutOfBounds{min: None, max: Some(self.engine.maximum_uncle_count()), found: self.block.base.uncles.len() + 1})); } // TODO: check number // TODO: check not a direct ancestor (use last_hashes for that) diff --git a/ethcore/src/blockchain/blockchain.rs b/ethcore/src/blockchain/blockchain.rs index 185bcaad3..e529f50af 100644 --- a/ethcore/src/blockchain/blockchain.rs +++ b/ethcore/src/blockchain/blockchain.rs @@ -78,7 +78,7 @@ pub trait BlockProvider { } /// Get a list of uncles for a given block. - /// Returns None if block deos not exist. + /// Returns None if block does not exist. fn uncles(&self, hash: &H256) -> Option> { self.block(hash).map(|bytes| BlockView::new(&bytes).uncles()) } @@ -227,6 +227,24 @@ impl BlockProvider for BlockChain { const COLLECTION_QUEUE_SIZE: usize = 8; +pub struct AncestryIter<'a> { + current: H256, + chain: &'a BlockChain, +} + +impl<'a> Iterator for AncestryIter<'a> { + type Item = H256; + fn next(&mut self) -> Option { + if self.current.is_zero() { + Option::None + } else { + let mut n = self.chain.block_details(&self.current).unwrap().parent; + mem::swap(&mut self.current, &mut n); + Some(n) + } + } +} + impl BlockChain { /// Create new instance of blockchain from given Genesis pub fn new(config: BlockChainConfig, genesis: &[u8], path: &Path) -> BlockChain { @@ -448,7 +466,8 @@ impl BlockChain { let mut write_details = self.block_details.write().unwrap(); for (hash, details) in update.block_details.into_iter() { batch.put_extras(&hash, &details); - write_details.insert(hash, details); + write_details.insert(hash.clone(), details); + self.note_used(CacheID::Extras(ExtrasIndex::BlockDetails, hash)); } let mut write_receipts = self.block_receipts.write().unwrap(); @@ -473,10 +492,35 @@ impl BlockChain { self.extras_db.write(batch).unwrap(); } - /// Given a block's `parent`, find every block header which represents a valid uncle. - pub fn find_uncle_headers(&self, _parent: &H256) -> Vec
{ - // TODO - Vec::new() + /// Iterator that lists `first` and then all of `first`'s ancestors, by hash. + pub fn ancestry_iter(&self, first: H256) -> Option { + if self.is_known(&first) { + Some(AncestryIter { + current: first, + chain: &self, + }) + } else { + None + } + } + + /// Given a block's `parent`, find every block header which represents a valid possible uncle. + pub fn find_uncle_headers(&self, parent: &H256, uncle_generations: usize) -> Option> { + if !self.is_known(parent) { return None; } + + let mut excluded = HashSet::new(); + for a in self.ancestry_iter(parent.clone()).unwrap().take(uncle_generations) { + excluded.extend(self.uncle_hashes(&a).unwrap().into_iter()); + excluded.insert(a); + } + + let mut ret = Vec::new(); + for a in self.ancestry_iter(parent.clone()).unwrap().skip(1).take(uncle_generations) { + ret.extend(self.block_details(&a).unwrap().children.iter() + .filter_map(|h| if excluded.contains(h) { None } else { self.block_header(h) }) + ); + } + Some(ret) } /// Get inserted block info which is critical to preapre extras updates. @@ -759,6 +803,14 @@ impl BlockChain { // TODO: handle block_hashes properly. block_hashes.clear(); + + blocks.shrink_to_fit(); + block_details.shrink_to_fit(); + block_hashes.shrink_to_fit(); + transaction_addresses.shrink_to_fit(); + block_logs.shrink_to_fit(); + blocks_blooms.shrink_to_fit(); + block_receipts.shrink_to_fit(); } if self.cache_size().total() < self.max_cache_size { break; } } @@ -809,6 +861,66 @@ mod tests { assert_eq!(bc.block_hash(2), None); } + #[test] + fn check_ancestry_iter() { + let mut canon_chain = ChainGenerator::default(); + let mut finalizer = BlockFinalizer::default(); + let genesis = canon_chain.generate(&mut finalizer).unwrap(); + let genesis_hash = BlockView::new(&genesis).header_view().sha3(); + + let temp = RandomTempPath::new(); + let bc = BlockChain::new(BlockChainConfig::default(), &genesis, temp.as_path()); + + let mut block_hashes = vec![genesis_hash.clone()]; + for _ in 0..10 { + let block = canon_chain.generate(&mut finalizer).unwrap(); + block_hashes.push(BlockView::new(&block).header_view().sha3()); + bc.insert_block(&block, vec![]); + } + + block_hashes.reverse(); + + assert_eq!(bc.ancestry_iter(block_hashes[0].clone()).unwrap().collect::>(), block_hashes) + } + + #[test] + #[cfg_attr(feature="dev", allow(cyclomatic_complexity))] + fn test_find_uncles() { + let mut canon_chain = ChainGenerator::default(); + let mut finalizer = BlockFinalizer::default(); + let genesis = canon_chain.generate(&mut finalizer).unwrap(); + let b1b = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap(); + let b1a = canon_chain.generate(&mut finalizer).unwrap(); + let b2b = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap(); + let b2a = canon_chain.generate(&mut finalizer).unwrap(); + let b3b = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap(); + let b3a = canon_chain.generate(&mut finalizer).unwrap(); + let b4b = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap(); + let b4a = canon_chain.generate(&mut finalizer).unwrap(); + let b5b = canon_chain.fork(1).generate(&mut finalizer.fork()).unwrap(); + let b5a = canon_chain.generate(&mut finalizer).unwrap(); + + let temp = RandomTempPath::new(); + let bc = BlockChain::new(BlockChainConfig::default(), &genesis, temp.as_path()); + bc.insert_block(&b1a, vec![]); + bc.insert_block(&b1b, vec![]); + bc.insert_block(&b2a, vec![]); + bc.insert_block(&b2b, vec![]); + bc.insert_block(&b3a, vec![]); + bc.insert_block(&b3b, vec![]); + bc.insert_block(&b4a, vec![]); + bc.insert_block(&b4b, vec![]); + bc.insert_block(&b5a, vec![]); + bc.insert_block(&b5b, vec![]); + + assert_eq!( + [&b4b, &b3b, &b2b].iter().map(|b| BlockView::new(b).header()).collect::>(), + bc.find_uncle_headers(&BlockView::new(&b4a).header_view().sha3(), 3).unwrap() + ); + + // TODO: insert block that already includes one of them as an uncle to check it's not allowed. + } + #[test] #[cfg_attr(feature="dev", allow(cyclomatic_complexity))] fn test_small_fork() { diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index fdcd6c057..852ba6a36 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -16,6 +16,8 @@ //! Blockchain database client. +use std::marker::PhantomData; +use std::sync::atomic::AtomicBool; use util::*; use util::panics::*; use blockchain::{BlockChain, BlockProvider}; @@ -35,6 +37,7 @@ use transaction::LocalizedTransaction; use extras::TransactionAddress; use filter::Filter; use log_entry::LocalizedLogEntry; +use util::keys::store::SecretStore; pub use block_queue::{BlockQueueConfig, BlockQueueInfo}; pub use blockchain::{TreeRoute, BlockChainConfig, CacheSize as BlockChainCacheSize}; @@ -76,12 +79,24 @@ pub enum BlockStatus { } /// Client configuration. Includes configs for all sub-systems. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct ClientConfig { /// Block queue configuration. pub queue: BlockQueueConfig, /// Blockchain configuration. pub blockchain: BlockChainConfig, + /// Prefer journal rather than archive. + pub prefer_journal: bool, +} + +impl Default for ClientConfig { + fn default() -> ClientConfig { + ClientConfig { + queue: Default::default(), + blockchain: Default::default(), + prefer_journal: false, + } + } } /// Information about the blockchain gathered together. @@ -126,6 +141,9 @@ pub trait BlockChainClient : Sync + Send { /// Get address nonce. fn nonce(&self, address: &Address) -> U256; + /// Get block hash. + fn block_hash(&self, id: BlockId) -> Option; + /// Get address code. fn code(&self, address: &Address) -> Option; @@ -188,7 +206,7 @@ impl ClientReport { /// Blockchain database client backed by a persistent database. Owns and manages a blockchain and a block queue. /// Call `import_block()` to import a block asynchronously; `flush_queue()` flushes the queue. -pub struct Client { +pub struct Client where V: Verifier { chain: Arc>, engine: Arc>, state_db: Mutex, @@ -198,21 +216,31 @@ pub struct Client { panic_handler: Arc, // for sealing... + sealing_enabled: AtomicBool, sealing_block: Mutex>, author: RwLock
, extra_data: RwLock, + verifier: PhantomData, + secret_store: Arc>, } const HISTORY: u64 = 1000; const CLIENT_DB_VER_STR: &'static str = "4.0"; -impl Client { +impl Client { /// Create a new client with given spec and DB path. pub fn new(config: ClientConfig, spec: Spec, path: &Path, message_channel: IoChannel ) -> Result, Error> { + Client::::new_with_verifier(config, spec, path, message_channel) + } +} + +impl Client where V: Verifier { + /// Create a new client with given spec and DB path and custom verifier. + pub fn new_with_verifier(config: ClientConfig, spec: Spec, path: &Path, message_channel: IoChannel ) -> Result>, Error> { let mut dir = path.to_path_buf(); dir.push(H64::from(spec.genesis_header().hash()).hex()); //TODO: sec/fat: pruned/full versioning - dir.push(format!("v{}-sec-pruned", CLIENT_DB_VER_STR)); + dir.push(format!("v{}-sec-{}", CLIENT_DB_VER_STR, if config.prefer_journal { "pruned" } else { "archive" })); let path = dir.as_path(); let gb = spec.genesis_block(); let chain = Arc::new(RwLock::new(BlockChain::new(config.blockchain, &gb, path))); @@ -220,7 +248,7 @@ impl Client { state_path.push("state"); let engine = Arc::new(try!(spec.to_engine())); - let mut state_db = JournalDB::new(state_path.to_str().unwrap()); + let mut state_db = JournalDB::from_prefs(state_path.to_str().unwrap(), config.prefer_journal); if state_db.is_empty() && engine.spec().ensure_db_good(&mut state_db) { state_db.commit(0, &engine.spec().genesis_header().hash(), None).expect("Error commiting genesis state to state DB"); } @@ -229,6 +257,9 @@ impl Client { let panic_handler = PanicHandler::new_in_arc(); panic_handler.forward_from(&block_queue); + let secret_store = Arc::new(RwLock::new(SecretStore::new())); + secret_store.write().unwrap().try_import_existing(); + Ok(Arc::new(Client { chain: chain, engine: engine, @@ -237,9 +268,12 @@ impl Client { report: RwLock::new(Default::default()), import_lock: Mutex::new(()), panic_handler: panic_handler, + sealing_enabled: AtomicBool::new(false), sealing_block: Mutex::new(None), author: RwLock::new(Address::new()), extra_data: RwLock::new(Vec::new()), + verifier: PhantomData, + secret_store: secret_store, })) } @@ -264,6 +298,11 @@ impl Client { last_hashes } + /// Secret store (key manager) + pub fn secret_store(&self) -> &Arc> { + &self.secret_store + } + fn check_and_close_block(&self, block: &PreverifiedBlock) -> Result { let engine = self.engine.deref().deref(); let header = &block.header; @@ -276,7 +315,7 @@ impl Client { } // Verify Block Family - let verify_family_result = verify_block_family(&header, &block.bytes, engine, self.chain.read().unwrap().deref()); + let verify_family_result = V::verify_block_family(&header, &block.bytes, engine, self.chain.read().unwrap().deref()); if let Err(e) = verify_family_result { warn!(target: "client", "Stage 3 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); return Err(()); @@ -302,7 +341,7 @@ impl Client { // Final Verification let closed_block = enact_result.unwrap(); - if let Err(e) = verify_block_final(&header, closed_block.block().header()) { + if let Err(e) = V::verify_block_final(&header, closed_block.block().header()) { warn!(target: "client", "Stage 4 block verification failed for #{} ({})\nError: {:?}", header.number(), header.hash(), e); return Err(()); } @@ -374,12 +413,12 @@ impl Client { if !good_blocks.is_empty() && block_queue.queue_info().is_empty() { io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { good: good_blocks, - bad: bad_blocks, + retracted: bad_blocks, })).unwrap(); } } - if self.chain_info().best_block_hash != original_best { + if self.chain_info().best_block_hash != original_best && self.sealing_enabled.load(atomic::Ordering::Relaxed) { self.prepare_sealing(); } @@ -462,7 +501,7 @@ impl Client { self.extra_data() ); - self.chain.read().unwrap().find_uncle_headers(&h).into_iter().foreach(|h| { b.push_uncle(h).unwrap(); }); + self.chain.read().unwrap().find_uncle_headers(&h, self.engine.deref().deref().maximum_uncle_age()).unwrap().into_iter().take(self.engine.deref().deref().maximum_uncle_count()).foreach(|h| { b.push_uncle(h).unwrap(); }); // TODO: push transactions. @@ -474,6 +513,8 @@ impl Client { /// Grab the `ClosedBlock` that we want to be sealed. Comes as a mutex that you have to lock. pub fn sealing_block(&self) -> &Mutex> { if self.sealing_block.lock().unwrap().is_none() { + self.sealing_enabled.store(true, atomic::Ordering::Relaxed); + // TODO: Above should be on a timer that resets after two blocks have arrived without being asked for. self.prepare_sealing(); } &self.sealing_block @@ -505,7 +546,7 @@ impl Client { // TODO: need MinerService MinerIoHandler -impl BlockChainClient for Client { +impl BlockChainClient for Client where V: Verifier { fn block_header(&self, id: BlockId) -> Option { let chain = self.chain.read().unwrap(); Self::block_hash(&chain, id).and_then(|hash| chain.block(&hash).map(|bytes| BlockView::new(&bytes).rlp().at(0).as_raw().to_vec())) @@ -549,6 +590,11 @@ impl BlockChainClient for Client { self.state().nonce(address) } + fn block_hash(&self, id: BlockId) -> Option { + let chain = self.chain.read().unwrap(); + Self::block_hash(&chain, id) + } + fn code(&self, address: &Address) -> Option { self.state().code(address) } diff --git a/ethcore/src/engine.rs b/ethcore/src/engine.rs index d607ce2e2..83e1986fd 100644 --- a/ethcore/src/engine.rs +++ b/ethcore/src/engine.rs @@ -47,6 +47,8 @@ pub trait Engine : Sync + Send { fn maximum_extra_data_size(&self) -> usize { decode(&self.spec().engine_params.get("maximumExtraDataSize").unwrap()) } /// Maximum number of uncles a block is allowed to declare. fn maximum_uncle_count(&self) -> usize { 2 } + /// The number of generations back that uncles can be. + fn maximum_uncle_age(&self) -> usize { 6 } /// The nonce with which accounts begin. fn account_start_nonce(&self) -> U256 { decode(&self.spec().engine_params.get("accountStartNonce").unwrap()) } diff --git a/ethcore/src/filter.rs b/ethcore/src/filter.rs index 95c5687a7..9bfebf52f 100644 --- a/ethcore/src/filter.rs +++ b/ethcore/src/filter.rs @@ -42,6 +42,22 @@ pub struct Filter { pub topics: [Option>; 4], } +impl Clone for Filter { + fn clone(&self) -> Self { + let mut topics = [None, None, None, None]; + for i in 0..4 { + topics[i] = self.topics[i].clone(); + } + + Filter { + from_block: self.from_block.clone(), + to_block: self.to_block.clone(), + address: self.address.clone(), + topics: topics + } + } +} + impl Filter { /// Returns combinations of each address and topic. pub fn bloom_possibilities(&self) -> Vec { diff --git a/ethcore/src/header.rs b/ethcore/src/header.rs index cc02d84db..1e1a54d57 100644 --- a/ethcore/src/header.rs +++ b/ethcore/src/header.rs @@ -29,7 +29,7 @@ pub type BlockNumber = u64; /// which is non-specific. /// /// Doesn't do all that much on its own. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq)] pub struct Header { // TODO: make all private. /// Parent hash. @@ -70,6 +70,25 @@ pub struct Header { pub bare_hash: RefCell>, } +impl PartialEq for Header { + fn eq(&self, c: &Header) -> bool { + self.parent_hash == c.parent_hash && + self.timestamp == c.timestamp && + self.number == c.number && + self.author == c.author && + self.transactions_root == c.transactions_root && + self.uncles_hash == c.uncles_hash && + self.extra_data == c.extra_data && + self.state_root == c.state_root && + self.receipts_root == c.receipts_root && + self.log_bloom == c.log_bloom && + self.gas_used == c.gas_used && + self.gas_limit == c.gas_limit && + self.difficulty == c.difficulty && + self.seal == c.seal + } +} + impl Default for Header { fn default() -> Self { Header { diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 756d02407..a80adb0ba 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -30,7 +30,7 @@ pub enum SyncMessage { /// Hashes of blocks imported to blockchain good: Vec, /// Hashes of blocks not imported to blockchain - bad: Vec, + retracted: Vec, }, /// A block is ready BlockVerified, diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index 65c9d7358..001d1729b 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -108,6 +108,25 @@ fn can_collect_garbage() { assert!(client.blockchain_cache_info().blocks < 100 * 1024); } +#[test] +fn can_handle_long_fork() { + let client_result = generate_dummy_client(1200); + let client = client_result.reference(); + for _ in 0..10 { + client.import_verified_blocks(&IoChannel::disconnected()); + } + assert_eq!(1200, client.chain_info().best_block_number); + + push_blocks_to_client(client, 45, 1201, 800); + push_blocks_to_client(client, 49, 1201, 800); + push_blocks_to_client(client, 53, 1201, 600); + + for _ in 0..20 { + client.import_verified_blocks(&IoChannel::disconnected()); + } + assert_eq!(2000, client.chain_info().best_block_number); +} + #[test] fn can_mine() { let dummy_blocks = get_good_dummy_block_seq(2); @@ -122,7 +141,7 @@ fn can_mine() { b.hash() } None => { panic!(); } - } + } }; assert!(client.submit_seal(pow_hash, vec![]).is_ok()); -} \ No newline at end of file +} diff --git a/ethcore/src/tests/helpers.rs b/ethcore/src/tests/helpers.rs index 44ad667b9..bb9a44614 100644 --- a/ethcore/src/tests/helpers.rs +++ b/ethcore/src/tests/helpers.rs @@ -156,10 +156,9 @@ pub fn generate_dummy_client(block_number: u32) -> GuardedTempResult rolling_block_number = rolling_block_number + 1; rolling_timestamp = rolling_timestamp + 10; - if let Err(_) = client.import_block(create_test_block(&header)) { - panic!("error importing block which is valid by definition"); + if let Err(e) = client.import_block(create_test_block(&header)) { + panic!("error importing block which is valid by definition: {:?}", e); } - } client.flush_queue(); client.import_verified_blocks(&IoChannel::disconnected()); @@ -170,6 +169,34 @@ pub fn generate_dummy_client(block_number: u32) -> GuardedTempResult } } +pub fn push_blocks_to_client(client: &Arc, timestamp_salt: u64, starting_number: usize, block_number: usize) { + let test_spec = get_test_spec(); + let test_engine = test_spec.to_engine().unwrap(); + let state_root = test_engine.spec().genesis_header().state_root; + let mut rolling_hash = client.chain_info().best_block_hash; + let mut rolling_block_number = starting_number as u64; + let mut rolling_timestamp = timestamp_salt + starting_number as u64 * 10; + + for _ in 0..block_number { + let mut header = Header::new(); + + header.gas_limit = decode(test_engine.spec().engine_params.get("minGasLimit").unwrap()); + header.difficulty = decode(test_engine.spec().engine_params.get("minimumDifficulty").unwrap()); + header.timestamp = rolling_timestamp; + header.number = rolling_block_number; + header.parent_hash = rolling_hash; + header.state_root = state_root.clone(); + + rolling_hash = header.hash(); + rolling_block_number = rolling_block_number + 1; + rolling_timestamp = rolling_timestamp + 10; + + if let Err(e) = client.import_block(create_test_block(&header)) { + panic!("error importing block which is valid by definition: {:?}", e); + } + } +} + pub fn get_test_client_with_blocks(blocks: Vec) -> GuardedTempResult> { let dir = RandomTempPath::new(); let client = Client::new(ClientConfig::default(), get_test_spec(), dir.as_path(), IoChannel::disconnected()).unwrap(); @@ -253,18 +280,29 @@ pub fn get_temp_state_in(path: &Path) -> State { pub fn get_good_dummy_block_seq(count: usize) -> Vec { let test_spec = get_test_spec(); let test_engine = test_spec.to_engine().unwrap(); - let mut parent = test_engine.spec().genesis_header().hash(); + get_good_dummy_block_fork_seq(1, count, &test_engine.spec().genesis_header().hash()) +} + +pub fn get_good_dummy_block_fork_seq(start_number: usize, count: usize, parent_hash: &H256) -> Vec { + let test_spec = get_test_spec(); + let test_engine = test_spec.to_engine().unwrap(); + let mut rolling_timestamp = start_number as u64 * 10; + let mut parent = *parent_hash; let mut r = Vec::new(); - for i in 1 .. count + 1 { + for i in start_number .. start_number + count + 1 { let mut block_header = Header::new(); block_header.gas_limit = decode(test_engine.spec().engine_params.get("minGasLimit").unwrap()); - block_header.difficulty = decode(test_engine.spec().engine_params.get("minimumDifficulty").unwrap()); - block_header.timestamp = i as u64; + block_header.difficulty = U256::from(i).mul(U256([0, 1, 0, 0])); + block_header.timestamp = rolling_timestamp; block_header.number = i as u64; block_header.parent_hash = parent; block_header.state_root = test_engine.spec().genesis_header().state_root; + parent = block_header.hash(); + rolling_timestamp = rolling_timestamp + 10; + r.push(create_test_block(&block_header)); + } r } diff --git a/ethcore/src/verification/canon_verifier.rs b/ethcore/src/verification/canon_verifier.rs new file mode 100644 index 000000000..30e368f1b --- /dev/null +++ b/ethcore/src/verification/canon_verifier.rs @@ -0,0 +1,34 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use blockchain::BlockProvider; +use engine::Engine; +use error::Error; +use header::Header; +use super::Verifier; +use super::verification; + +pub struct CanonVerifier; + +impl Verifier for CanonVerifier { + fn verify_block_family(header: &Header, bytes: &[u8], engine: &Engine, bc: &BlockProvider) -> Result<(), Error> { + verification::verify_block_family(header, bytes, engine, bc) + } + + fn verify_block_final(expected: &Header, got: &Header) -> Result<(), Error> { + verification::verify_block_final(expected, got) + } +} diff --git a/ethcore/src/verification/mod.rs b/ethcore/src/verification/mod.rs new file mode 100644 index 000000000..260121989 --- /dev/null +++ b/ethcore/src/verification/mod.rs @@ -0,0 +1,25 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +pub mod verification; +pub mod verifier; +mod canon_verifier; +mod noop_verifier; + +pub use self::verification::*; +pub use self::verifier::Verifier; +pub use self::canon_verifier::CanonVerifier; +pub use self::noop_verifier::NoopVerifier; diff --git a/ethcore/src/verification/noop_verifier.rs b/ethcore/src/verification/noop_verifier.rs new file mode 100644 index 000000000..ae2a153fe --- /dev/null +++ b/ethcore/src/verification/noop_verifier.rs @@ -0,0 +1,33 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use blockchain::BlockProvider; +use engine::Engine; +use error::Error; +use header::Header; +use super::Verifier; + +pub struct NoopVerifier; + +impl Verifier for NoopVerifier { + fn verify_block_family(_header: &Header, _bytes: &[u8], _engine: &Engine, _bc: &BlockProvider) -> Result<(), Error> { + Ok(()) + } + + fn verify_block_final(_expected: &Header, _got: &Header) -> Result<(), Error> { + Ok(()) + } +} diff --git a/ethcore/src/verification.rs b/ethcore/src/verification/verification.rs similarity index 98% rename from ethcore/src/verification.rs rename to ethcore/src/verification/verification.rs index f52e2e1e4..ed3db3791 100644 --- a/ethcore/src/verification.rs +++ b/ethcore/src/verification/verification.rs @@ -61,7 +61,7 @@ pub fn verify_block_unordered(header: Header, bytes: Bytes, engine: &Engine) -> for u in Rlp::new(&bytes).at(2).iter().map(|rlp| rlp.as_val::
()) { try!(engine.verify_block_unordered(&u, None)); } - // Verify transactions. + // Verify transactions. let mut transactions = Vec::new(); { let v = BlockView::new(&bytes); @@ -78,7 +78,7 @@ pub fn verify_block_unordered(header: Header, bytes: Bytes, engine: &Engine) -> } /// Phase 3 verification. Check block information against parent and uncles. -pub fn verify_block_family(header: &Header, bytes: &[u8], engine: &Engine, bc: &BC) -> Result<(), Error> where BC: BlockProvider { +pub fn verify_block_family(header: &Header, bytes: &[u8], engine: &Engine, bc: &BlockProvider) -> Result<(), Error> { // TODO: verify timestamp let parent = try!(bc.block_header(&header.parent_hash).ok_or_else(|| Error::from(BlockError::UnknownParent(header.parent_hash.clone())))); try!(verify_parent(&header, &parent)); @@ -94,7 +94,7 @@ pub fn verify_block_family(header: &Header, bytes: &[u8], engine: &Engine, b excluded.insert(header.hash()); let mut hash = header.parent_hash.clone(); excluded.insert(hash.clone()); - for _ in 0..6 { + for _ in 0..engine.maximum_uncle_age() { match bc.block_details(&hash) { Some(details) => { excluded.insert(details.parent.clone()); @@ -121,7 +121,7 @@ pub fn verify_block_family(header: &Header, bytes: &[u8], engine: &Engine, b // (8 Invalid) let depth = if header.number > uncle.number { header.number - uncle.number } else { 0 }; - if depth > 6 { + if depth > engine.maximum_uncle_age() as u64 { return Err(From::from(BlockError::UncleTooOld(OutOfBounds { min: Some(header.number - depth), max: Some(header.number - 1), found: uncle.number }))); } else if depth < 1 { @@ -141,7 +141,7 @@ pub fn verify_block_family(header: &Header, bytes: &[u8], engine: &Engine, b let uncle_parent = try!(bc.block_header(&uncle.parent_hash).ok_or_else(|| Error::from(BlockError::UnknownUncleParent(uncle.parent_hash.clone())))); for _ in 0..depth { match bc.block_details(&expected_uncle_parent) { - Some(details) => { + Some(details) => { expected_uncle_parent = details.parent; }, None => break @@ -468,7 +468,7 @@ mod tests { header.number = 9; check_fail(family_test(&create_test_block_with_data(&header, &good_transactions, &good_uncles), engine.deref(), &bc), InvalidNumber(Mismatch { expected: parent.number + 1, found: header.number })); - + header = good.clone(); let mut bad_uncles = good_uncles.clone(); bad_uncles.push(good_uncle1.clone()); diff --git a/ethcore/src/verification/verifier.rs b/ethcore/src/verification/verifier.rs new file mode 100644 index 000000000..cc5edce29 --- /dev/null +++ b/ethcore/src/verification/verifier.rs @@ -0,0 +1,26 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +use blockchain::BlockProvider; +use engine::Engine; +use error::Error; +use header::Header; + +/// Should be used to verify blocks. +pub trait Verifier: Send + Sync { + fn verify_block_family(header: &Header, bytes: &[u8], engine: &Engine, bc: &BlockProvider) -> Result<(), Error>; + fn verify_block_final(expected: &Header, got: &Header) -> Result<(), Error>; +} diff --git a/parity/main.rs b/parity/main.rs index b991f36cd..3f4243a0a 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -65,6 +65,7 @@ Usage: Options: --chain CHAIN Specify the blockchain type. CHAIN may be either a JSON chain specification file or frontier, mainnet, morden, or testnet [default: frontier]. + --archive Client should not prune the state/storage trie. -d --db-path PATH Specify the database & configuration directory path [default: $HOME/.parity] --keys-path PATH Specify the path for JSON key files to be found [default: $HOME/.web3/keys] @@ -102,6 +103,7 @@ struct Args { flag_chain: String, flag_db_path: String, flag_keys_path: String, + flag_archive: bool, flag_no_bootstrap: bool, flag_listen_address: String, flag_public_address: Option, @@ -311,6 +313,7 @@ impl Configuration { let mut client_config = ClientConfig::default(); client_config.blockchain.pref_cache_size = self.args.flag_cache_pref_size; client_config.blockchain.max_cache_size = self.args.flag_cache_max_size; + client_config.prefer_journal = !self.args.flag_archive; client_config.queue.max_mem_use = self.args.flag_queue_max_size; let mut service = ClientService::start(client_config, spec, net_settings, &Path::new(&self.path())).unwrap(); let client = service.client().clone(); diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 086fb19c1..bfdf8f2d3 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -20,6 +20,7 @@ ethash = { path = "../ethash" } ethsync = { path = "../sync" } clippy = { version = "0.0.44", optional = true } rustc-serialize = "0.3" +transient-hashmap = "0.1" serde_macros = { version = "0.7.0", optional = true } [build-dependencies] diff --git a/rpc/build.rs b/rpc/build.rs index fe1a55694..b5adeaba1 100644 --- a/rpc/build.rs +++ b/rpc/build.rs @@ -9,8 +9,8 @@ mod inner { pub fn main() { let out_dir = env::var_os("OUT_DIR").unwrap(); - let src = Path::new("src/lib.rs.in"); - let dst = Path::new(&out_dir).join("lib.rs"); + let src = Path::new("src/v1/types/mod.rs.in"); + let dst = Path::new(&out_dir).join("mod.rs"); let mut registry = syntex::Registry::new(); diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 8dd58d0b8..0653a0c33 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -27,9 +27,35 @@ extern crate jsonrpc_http_server; extern crate ethcore_util as util; extern crate ethcore; extern crate ethsync; +extern crate transient_hashmap; -#[cfg(feature = "serde_macros")] -include!("lib.rs.in"); +use self::jsonrpc_core::{IoHandler, IoDelegate}; -#[cfg(not(feature = "serde_macros"))] -include!(concat!(env!("OUT_DIR"), "/lib.rs")); +pub mod v1; + +/// Http server. +pub struct HttpServer { + handler: IoHandler, + threads: usize +} + +impl HttpServer { + /// Construct new http server object with given number of threads. + pub fn new(threads: usize) -> HttpServer { + HttpServer { + handler: IoHandler::new(), + threads: threads + } + } + + /// Add io delegate. + pub fn add_delegate(&mut self, delegate: IoDelegate) where D: Send + Sync + 'static { + self.handler.add_delegate(delegate); + } + + /// Start server asynchronously in new thread + pub fn start_async(self, addr: &str, cors_domain: &str) { + let server = jsonrpc_http_server::Server::new(self.handler, self.threads); + server.start_async(addr, jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain.to_owned())) + } +} diff --git a/rpc/src/lib.rs.in b/rpc/src/lib.rs.in deleted file mode 100644 index e17f2b3bb..000000000 --- a/rpc/src/lib.rs.in +++ /dev/null @@ -1,30 +0,0 @@ -use self::jsonrpc_core::{IoHandler, IoDelegate}; - -pub mod v1; - -/// Http server. -pub struct HttpServer { - handler: IoHandler, - threads: usize -} - -impl HttpServer { - /// Construct new http server object with given number of threads. - pub fn new(threads: usize) -> HttpServer { - HttpServer { - handler: IoHandler::new(), - threads: threads - } - } - - /// Add io delegate. - pub fn add_delegate(&mut self, delegate: IoDelegate) where D: Send + Sync + 'static { - self.handler.add_delegate(delegate); - } - - /// Start server asynchronously in new thread - pub fn start_async(self, addr: &str, cors_domain: &str) { - let server = jsonrpc_http_server::Server::new(self.handler, self.threads); - server.start_async(addr, jsonrpc_http_server::AccessControlAllowOrigin::Value(cors_domain.to_owned())) - } -} diff --git a/rpc/src/v1/helpers/mod.rs b/rpc/src/v1/helpers/mod.rs new file mode 100644 index 000000000..b1a5c05ba --- /dev/null +++ b/rpc/src/v1/helpers/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +mod poll_manager; +mod poll_filter; + +pub use self::poll_manager::PollManager; +pub use self::poll_filter::PollFilter; diff --git a/rpc/src/v1/helpers/poll_filter.rs b/rpc/src/v1/helpers/poll_filter.rs new file mode 100644 index 000000000..465290270 --- /dev/null +++ b/rpc/src/v1/helpers/poll_filter.rs @@ -0,0 +1,10 @@ +//! Helper type with all filter possibilities. + +use ethcore::filter::Filter; + +#[derive(Clone)] +pub enum PollFilter { + Block, + PendingTransaction, + Logs(Filter) +} diff --git a/rpc/src/v1/helpers/poll_manager.rs b/rpc/src/v1/helpers/poll_manager.rs new file mode 100644 index 000000000..36a6352c2 --- /dev/null +++ b/rpc/src/v1/helpers/poll_manager.rs @@ -0,0 +1,144 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Indexes all rpc poll requests. + +use transient_hashmap::{TransientHashMap, Timer, StandardTimer}; + +/// Lifetime of poll (in seconds). +const POLL_LIFETIME: u64 = 60; + +pub type PollId = usize; +pub type BlockNumber = u64; + +pub struct PollInfo { + pub filter: F, + pub block_number: BlockNumber +} + +impl Clone for PollInfo where F: Clone { + fn clone(&self) -> Self { + PollInfo { + filter: self.filter.clone(), + block_number: self.block_number.clone() + } + } +} + +/// Indexes all poll requests. +/// +/// Lazily garbage collects unused polls info. +pub struct PollManager where T: Timer { + polls: TransientHashMap, T>, + next_available_id: PollId +} + +impl PollManager { + /// Creates new instance of indexer. + pub fn new() -> Self { + PollManager::new_with_timer(Default::default()) + } +} + +impl PollManager where T: Timer { + pub fn new_with_timer(timer: T) -> Self { + PollManager { + polls: TransientHashMap::new_with_timer(POLL_LIFETIME, timer), + next_available_id: 0, + } + } + + /// Returns id which can be used for new poll. + /// + /// Stores information when last poll happend. + pub fn create_poll(&mut self, filter: F, block: BlockNumber) -> PollId { + self.polls.prune(); + let id = self.next_available_id; + self.next_available_id += 1; + self.polls.insert(id, PollInfo { + filter: filter, + block_number: block, + }); + id + } + + /// Updates information when last poll happend. + pub fn update_poll(&mut self, id: &PollId, block: BlockNumber) { + self.polls.prune(); + if let Some(info) = self.polls.get_mut(id) { + info.block_number = block; + } + } + + /// Returns number of block when last poll happend. + pub fn get_poll_info(&mut self, id: &PollId) -> Option<&PollInfo> { + self.polls.prune(); + self.polls.get(id) + } + + /// Removes poll info. + pub fn remove_poll(&mut self, id: &PollId) { + self.polls.remove(id); + } +} + +#[cfg(test)] +mod tests { + use std::cell::RefCell; + use transient_hashmap::Timer; + use v1::helpers::PollManager; + + struct TestTimer<'a> { + time: &'a RefCell, + } + + impl<'a> Timer for TestTimer<'a> { + fn get_time(&self) -> i64 { + *self.time.borrow() + } + } + + #[test] + fn test_poll_indexer() { + let time = RefCell::new(0); + let timer = TestTimer { + time: &time, + }; + + let mut indexer = PollManager::new_with_timer(timer); + assert_eq!(indexer.create_poll(false, 20), 0); + assert_eq!(indexer.create_poll(true, 20), 1); + + *time.borrow_mut() = 10; + indexer.update_poll(&0, 21); + assert_eq!(indexer.get_poll_info(&0).unwrap().filter, false); + assert_eq!(indexer.get_poll_info(&0).unwrap().block_number, 21); + + *time.borrow_mut() = 30; + indexer.update_poll(&1, 23); + assert_eq!(indexer.get_poll_info(&1).unwrap().filter, true); + assert_eq!(indexer.get_poll_info(&1).unwrap().block_number, 23); + + *time.borrow_mut() = 75; + indexer.update_poll(&0, 30); + assert!(indexer.get_poll_info(&0).is_none()); + assert_eq!(indexer.get_poll_info(&1).unwrap().filter, true); + assert_eq!(indexer.get_poll_info(&1).unwrap().block_number, 23); + + indexer.remove_poll(&1); + assert!(indexer.get_poll_info(&1).is_none()); + } +} diff --git a/rpc/src/v1/impls/eth.rs b/rpc/src/v1/impls/eth.rs index d85286196..0bc843d57 100644 --- a/rpc/src/v1/impls/eth.rs +++ b/rpc/src/v1/impls/eth.rs @@ -15,11 +15,12 @@ // along with Parity. If not, see . //! Eth rpc implementation. +use std::collections::HashMap; +use std::sync::{Arc, Weak, Mutex, RwLock}; use ethsync::{EthSync, SyncState}; use jsonrpc_core::*; use util::numbers::*; use util::sha3::*; -use util::standard::{RwLock, HashMap, Arc, Weak}; use util::rlp::encode; use ethcore::client::*; use ethcore::block::{IsBlock}; @@ -29,6 +30,7 @@ use ethcore::ethereum::Ethash; use ethcore::ethereum::denominations::shannon; use v1::traits::{Eth, EthFilter}; use v1::types::{Block, BlockTransactions, BlockNumber, Bytes, SyncStatus, SyncInfo, Transaction, OptionalValue, Index, Filter, Log}; +use v1::helpers::{PollFilter, PollManager}; /// Eth rpc implementation. pub struct EthClient { @@ -266,28 +268,98 @@ impl Eth for EthClient { /// Eth filter rpc implementation. pub struct EthFilterClient { - client: Weak + client: Weak, + polls: Mutex>, } impl EthFilterClient { /// Creates new Eth filter client. pub fn new(client: &Arc) -> Self { EthFilterClient { - client: Arc::downgrade(client) + client: Arc::downgrade(client), + polls: Mutex::new(PollManager::new()) } } } impl EthFilter for EthFilterClient { - fn new_block_filter(&self, _params: Params) -> Result { - Ok(Value::U64(0)) + fn new_filter(&self, params: Params) -> Result { + from_params::<(Filter,)>(params) + .and_then(|(filter,)| { + let mut polls = self.polls.lock().unwrap(); + let id = polls.create_poll(PollFilter::Logs(filter.into()), take_weak!(self.client).chain_info().best_block_number); + to_value(&U256::from(id)) + }) } - fn new_pending_transaction_filter(&self, _params: Params) -> Result { - Ok(Value::U64(1)) + fn new_block_filter(&self, params: Params) -> Result { + match params { + Params::None => { + let mut polls = self.polls.lock().unwrap(); + let id = polls.create_poll(PollFilter::Block, take_weak!(self.client).chain_info().best_block_number); + to_value(&U256::from(id)) + }, + _ => Err(Error::invalid_params()) + } } - fn filter_changes(&self, _: Params) -> Result { - to_value(&take_weak!(self.client).chain_info().best_block_hash).map(|v| Value::Array(vec![v])) + fn new_pending_transaction_filter(&self, params: Params) -> Result { + match params { + Params::None => { + let mut polls = self.polls.lock().unwrap(); + let id = polls.create_poll(PollFilter::PendingTransaction, take_weak!(self.client).chain_info().best_block_number); + to_value(&U256::from(id)) + }, + _ => Err(Error::invalid_params()) + } + } + + fn filter_changes(&self, params: Params) -> Result { + let client = take_weak!(self.client); + from_params::<(Index,)>(params) + .and_then(|(index,)| { + let info = self.polls.lock().unwrap().get_poll_info(&index.value()).cloned(); + match info { + None => Ok(Value::Array(vec![] as Vec)), + Some(info) => match info.filter { + PollFilter::Block => { + let current_number = client.chain_info().best_block_number; + let hashes = (info.block_number..current_number).into_iter() + .map(BlockId::Number) + .filter_map(|id| client.block_hash(id)) + .collect::>(); + + self.polls.lock().unwrap().update_poll(&index.value(), current_number); + + to_value(&hashes) + }, + PollFilter::PendingTransaction => { + // TODO: fix implementation once TransactionQueue is merged + to_value(&vec![] as &Vec) + }, + PollFilter::Logs(mut filter) => { + filter.from_block = BlockId::Number(info.block_number); + filter.to_block = BlockId::Latest; + let logs = client.logs(filter) + .into_iter() + .map(From::from) + .collect::>(); + + let current_number = client.chain_info().best_block_number; + self.polls.lock().unwrap().update_poll(&index.value(), current_number); + + to_value(&logs) + } + } + } + }) + } + + fn uninstall_filter(&self, params: Params) -> Result { + from_params::<(Index,)>(params) + .and_then(|(index,)| { + self.polls.lock().unwrap().remove_poll(&index.value()); + to_value(&true) + }) } } diff --git a/rpc/src/v1/impls/mod.rs b/rpc/src/v1/impls/mod.rs index d9102b1db..10d451e9f 100644 --- a/rpc/src/v1/impls/mod.rs +++ b/rpc/src/v1/impls/mod.rs @@ -28,7 +28,9 @@ macro_rules! take_weak { mod web3; mod eth; mod net; +mod personal; pub use self::web3::Web3Client; pub use self::eth::{EthClient, EthFilterClient}; pub use self::net::NetClient; +pub use self::personal::PersonalClient; diff --git a/rpc/src/v1/impls/personal.rs b/rpc/src/v1/impls/personal.rs new file mode 100644 index 000000000..48e1b1c6a --- /dev/null +++ b/rpc/src/v1/impls/personal.rs @@ -0,0 +1,78 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Account management (personal) rpc implementation +use std::sync::{Arc, Weak}; +use jsonrpc_core::*; +use v1::traits::Personal; +use util::keys::store::*; +use util::Address; +use std::sync::RwLock; + +/// Account management (personal) rpc implementation. +pub struct PersonalClient { + secret_store: Weak>, +} + +impl PersonalClient { + /// Creates new PersonalClient + pub fn new(store: &Arc>) -> Self { + PersonalClient { + secret_store: Arc::downgrade(store), + } + } +} + +impl Personal for PersonalClient { + fn accounts(&self, _: Params) -> Result { + let store_wk = take_weak!(self.secret_store); + let store = store_wk.read().unwrap(); + match store.accounts() { + Ok(account_list) => { + Ok(Value::Array(account_list.iter() + .map(|&(account, _)| Value::String(format!("{:?}", account))) + .collect::>()) + ) + } + Err(_) => Err(Error::internal_error()) + } + } + + fn new_account(&self, params: Params) -> Result { + from_params::<(String, )>(params).and_then( + |(pass, )| { + let store_wk = take_weak!(self.secret_store); + let mut store = store_wk.write().unwrap(); + match store.new_account(&pass) { + Ok(address) => Ok(Value::String(format!("{:?}", address))), + Err(_) => Err(Error::internal_error()) + } + } + ) + } + + fn unlock_account(&self, params: Params) -> Result { + from_params::<(Address, String, u64)>(params).and_then( + |(account, account_pass, _)|{ + let store_wk = take_weak!(self.secret_store); + let store = store_wk.read().unwrap(); + match store.unlock_account(&account, &account_pass) { + Ok(_) => Ok(Value::Bool(true)), + Err(_) => Ok(Value::Bool(false)), + } + }) + } +} diff --git a/rpc/src/v1/impls/web3.rs b/rpc/src/v1/impls/web3.rs index 4d31f73ae..64a82adb9 100644 --- a/rpc/src/v1/impls/web3.rs +++ b/rpc/src/v1/impls/web3.rs @@ -30,9 +30,7 @@ impl Web3Client { impl Web3 for Web3Client { fn client_version(&self, params: Params) -> Result { match params { - Params::None => { - Ok(Value::String(version().to_owned().replace("Parity/", "Parity//"))), - } + Params::None => Ok(Value::String(version().to_owned().replace("Parity/", "Parity//"))), _ => Err(Error::invalid_params()) } } diff --git a/rpc/src/v1/mod.rs b/rpc/src/v1/mod.rs index 01635e872..104a8b3f0 100644 --- a/rpc/src/v1/mod.rs +++ b/rpc/src/v1/mod.rs @@ -15,7 +15,7 @@ // along with Parity. If not, see . //! Ethcore rpc v1. -//! +//! //! Compliant with ethereum rpc. pub mod traits; @@ -23,6 +23,7 @@ mod impls; mod types; #[cfg(test)] mod tests; +mod helpers; -pub use self::traits::{Web3, Eth, EthFilter, Net}; +pub use self::traits::{Web3, Eth, EthFilter, Personal, Net}; pub use self::impls::*; diff --git a/rpc/src/v1/traits/mod.rs b/rpc/src/v1/traits/mod.rs index c9af6dac3..0a95cb050 100644 --- a/rpc/src/v1/traits/mod.rs +++ b/rpc/src/v1/traits/mod.rs @@ -23,7 +23,9 @@ macro_rules! rpc_unimplemented { pub mod web3; pub mod eth; pub mod net; +pub mod personal; pub use self::web3::Web3; pub use self::eth::{Eth, EthFilter}; pub use self::net::Net; +pub use self::personal::Personal; diff --git a/rpc/src/v1/traits/personal.rs b/rpc/src/v1/traits/personal.rs new file mode 100644 index 000000000..0cf72951c --- /dev/null +++ b/rpc/src/v1/traits/personal.rs @@ -0,0 +1,41 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +//! Personal rpc interface. +use std::sync::Arc; +use jsonrpc_core::*; + +/// Personal rpc interface. +pub trait Personal: Sized + Send + Sync + 'static { + + /// Lists all stored accounts + fn accounts(&self, _: Params) -> Result { rpc_unimplemented!() } + + /// Creates new account (it becomes new current unlocked account) + fn new_account(&self, _: Params) -> Result { rpc_unimplemented!() } + + /// Unlocks specified account for use (can only be one unlocked account at one moment) + fn unlock_account(&self, _: Params) -> Result { rpc_unimplemented!() } + + /// Should be used to convert object to io delegate. + fn to_delegate(self) -> IoDelegate { + let mut delegate = IoDelegate::new(Arc::new(self)); + delegate.add_method("personal_listAccounts", Personal::accounts); + delegate.add_method("personal_newAccount", Personal::new_account); + delegate.add_method("personal_unlockAccount", Personal::unlock_account); + delegate + } +} diff --git a/rpc/src/v1/types/mod.rs b/rpc/src/v1/types/mod.rs index 34c1f1cff..adf9be071 100644 --- a/rpc/src/v1/types/mod.rs +++ b/rpc/src/v1/types/mod.rs @@ -14,22 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . -mod block; -mod block_number; -mod bytes; -mod filter; -mod index; -mod log; -mod optionals; -mod sync; -mod transaction; +#[cfg(feature = "serde_macros")] +include!("mod.rs.in"); -pub use self::block::{Block, BlockTransactions}; -pub use self::block_number::BlockNumber; -pub use self::bytes::Bytes; -pub use self::filter::Filter; -pub use self::index::Index; -pub use self::log::Log; -pub use self::optionals::OptionalValue; -pub use self::sync::{SyncStatus, SyncInfo}; -pub use self::transaction::Transaction; +#[cfg(not(feature = "serde_macros"))] +include!(concat!(env!("OUT_DIR"), "/mod.rs")); diff --git a/rpc/src/v1/types/mod.rs.in b/rpc/src/v1/types/mod.rs.in new file mode 100644 index 000000000..34c1f1cff --- /dev/null +++ b/rpc/src/v1/types/mod.rs.in @@ -0,0 +1,35 @@ +// Copyright 2015, 2016 Ethcore (UK) Ltd. +// This file is part of Parity. + +// Parity is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity. If not, see . + +mod block; +mod block_number; +mod bytes; +mod filter; +mod index; +mod log; +mod optionals; +mod sync; +mod transaction; + +pub use self::block::{Block, BlockTransactions}; +pub use self::block_number::BlockNumber; +pub use self::bytes::Bytes; +pub use self::filter::Filter; +pub use self::index::Index; +pub use self::log::Log; +pub use self::optionals::OptionalValue; +pub use self::sync::{SyncStatus, SyncInfo}; +pub use self::transaction::Transaction; diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 993f07a65..0097cd47e 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -9,7 +9,7 @@ authors = ["Ethcore { trace!(target: "sync", "New block already in chain {:?}", h); @@ -494,7 +494,10 @@ impl ChainSync { trace!(target: "sync", "New block already queued {:?}", h); }, Ok(_) => { - self.last_imported_block = Some(header.number); + if self.current_base_block() < header.number { + self.last_imported_block = Some(header.number); + self.remove_downloaded_blocks(header.number); + } trace!(target: "sync", "New block queued {:?}", h); }, Err(Error::Block(BlockError::UnknownParent(p))) => { @@ -1174,9 +1177,7 @@ impl ChainSync { } /// returns peer ids that have less blocks than our chain - fn get_lagging_peers(&mut self, io: &SyncIo) -> Vec<(PeerId, BlockNumber)> { - let chain = io.chain(); - let chain_info = chain.chain_info(); + fn get_lagging_peers(&mut self, chain_info: &BlockChainInfo, io: &SyncIo) -> Vec<(PeerId, BlockNumber)> { let latest_hash = chain_info.best_block_hash; let latest_number = chain_info.best_block_number; self.peers.iter_mut().filter_map(|(&id, ref mut peer_info)| @@ -1195,9 +1196,9 @@ impl ChainSync { } /// propagates latest block to lagging peers - fn propagate_blocks(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize { + fn propagate_blocks(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { let updated_peers = { - let lagging_peers = self.get_lagging_peers(io); + let lagging_peers = self.get_lagging_peers(chain_info, io); // sqrt(x)/x scaled to max u32 let fraction = (self.peers.len() as f64).powf(-0.5).mul(u32::max_value() as f64).round() as u32; @@ -1214,30 +1215,30 @@ impl ChainSync { for peer_id in updated_peers { let rlp = ChainSync::create_latest_block_rlp(io.chain()); self.send_packet(io, peer_id, NEW_BLOCK_PACKET, rlp); - self.peers.get_mut(&peer_id).unwrap().latest_hash = local_best.clone(); - self.peers.get_mut(&peer_id).unwrap().latest_number = Some(best_number); + self.peers.get_mut(&peer_id).unwrap().latest_hash = chain_info.best_block_hash.clone(); + self.peers.get_mut(&peer_id).unwrap().latest_number = Some(chain_info.best_block_number); sent = sent + 1; } sent } /// propagates new known hashes to all peers - fn propagate_new_hashes(&mut self, local_best: &H256, best_number: BlockNumber, io: &mut SyncIo) -> usize { - let updated_peers = self.get_lagging_peers(io); + fn propagate_new_hashes(&mut self, chain_info: &BlockChainInfo, io: &mut SyncIo) -> usize { + let updated_peers = self.get_lagging_peers(chain_info, io); let mut sent = 0; - let last_parent = HeaderView::new(&io.chain().block_header(BlockId::Hash(local_best.clone())).unwrap()).parent_hash(); + let last_parent = HeaderView::new(&io.chain().block_header(BlockId::Hash(chain_info.best_block_hash.clone())).unwrap()).parent_hash(); for (peer_id, peer_number) in updated_peers { let mut peer_best = self.peers.get(&peer_id).unwrap().latest_hash.clone(); - if best_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber { + if chain_info.best_block_number - peer_number > MAX_PEERS_PROPAGATION as BlockNumber { // If we think peer is too far behind just send one latest hash peer_best = last_parent.clone(); } - sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &peer_best, &local_best) { + sent = sent + match ChainSync::create_new_hashes_rlp(io.chain(), &peer_best, &chain_info.best_block_hash) { Some(rlp) => { { let peer = self.peers.get_mut(&peer_id).unwrap(); - peer.latest_hash = local_best.clone(); - peer.latest_number = Some(best_number); + peer.latest_hash = chain_info.best_block_hash.clone(); + peer.latest_number = Some(chain_info.best_block_number); } self.send_packet(io, peer_id, NEW_BLOCK_HASHES_PACKET, rlp); 1 @@ -1257,8 +1258,8 @@ impl ChainSync { pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) { let chain = io.chain().chain_info(); if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { - let blocks = self.propagate_blocks(&chain.best_block_hash, chain.best_block_number, io); - let hashes = self.propagate_new_hashes(&chain.best_block_hash, chain.best_block_number, io); + let blocks = self.propagate_blocks(&chain, io); + let hashes = self.propagate_new_hashes(&chain, io); if blocks != 0 || hashes != 0 { trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); } @@ -1267,10 +1268,11 @@ impl ChainSync { } /// called when block is imported to chain, updates transactions queue - pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], bad: &[H256]) { + pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) { fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { let block = chain .block(BlockId::Hash(hash.clone())) + // Client should send message after commit to db and inserting to chain. .expect("Expected in-chain blocks."); let block = BlockView::new(&block); block.transactions() @@ -1279,14 +1281,14 @@ impl ChainSync { let chain = io.chain(); let good = good.par_iter().map(|h| fetch_transactions(chain, h)); - let bad = bad.par_iter().map(|h| fetch_transactions(chain, h)); + let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h)); good.for_each(|txs| { let mut transaction_queue = self.transaction_queue.lock().unwrap(); let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); }); - bad.for_each(|txs| { + retracted.for_each(|txs| { // populate sender for tx in &txs { let _sender = tx.sender(); @@ -1435,12 +1437,13 @@ mod tests { #[test] fn finds_lagging_peers() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); + let chain_info = client.chain_info(); let io = TestIo::new(&mut client, &mut queue, None); - let lagging_peers = sync.get_lagging_peers(&io); + let lagging_peers = sync.get_lagging_peers(&chain_info, &io); assert_eq!(1, lagging_peers.len()) } @@ -1448,7 +1451,7 @@ mod tests { #[test] fn calculates_tree_for_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(15, BlocksWith::Uncle); + client.add_blocks(15, EachBlockWith::Uncle); let start = client.block_hash_delta_minus(4); let end = client.block_hash_delta_minus(2); @@ -1465,14 +1468,13 @@ mod tests { #[test] fn sends_new_hashes_to_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); - let best_hash = client.chain_info().best_block_hash.clone(); - let best_number = client.chain_info().best_block_number; + let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); - let peer_count = sync.propagate_new_hashes(&best_hash, best_number, &mut io); + let peer_count = sync.propagate_new_hashes(&chain_info, &mut io); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1485,14 +1487,12 @@ mod tests { #[test] fn sends_latest_block_to_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); - let best_hash = client.chain_info().best_block_hash.clone(); - let best_number = client.chain_info().best_block_number; + let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); - - let peer_count = sync.propagate_blocks(&best_hash, best_number, &mut io); + let peer_count = sync.propagate_blocks(&chain_info, &mut io); // 1 message should be send assert_eq!(1, io.queue.len()); @@ -1505,7 +1505,7 @@ mod tests { #[test] fn handles_peer_new_block_mallformed() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, BlocksWith::Uncle); + client.add_blocks(10, EachBlockWith::Uncle); let block_data = get_dummy_block(11, client.chain_info().best_block_hash); @@ -1523,7 +1523,7 @@ mod tests { #[test] fn handles_peer_new_block() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, BlocksWith::Uncle); + client.add_blocks(10, EachBlockWith::Uncle); let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash); @@ -1541,7 +1541,7 @@ mod tests { #[test] fn handles_peer_new_block_empty() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, BlocksWith::Uncle); + client.add_blocks(10, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1557,7 +1557,7 @@ mod tests { #[test] fn handles_peer_new_hashes() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, BlocksWith::Uncle); + client.add_blocks(10, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1573,7 +1573,7 @@ mod tests { #[test] fn handles_peer_new_hashes_empty() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, BlocksWith::Uncle); + client.add_blocks(10, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1591,14 +1591,13 @@ mod tests { #[test] fn hashes_rlp_mutually_acceptable() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); - let best_hash = client.chain_info().best_block_hash.clone(); - let best_number = client.chain_info().best_block_number; + let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagate_new_hashes(&best_hash, best_number, &mut io); + sync.propagate_new_hashes(&chain_info, &mut io); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(&data)); @@ -1610,14 +1609,13 @@ mod tests { #[test] fn block_rlp_mutually_acceptable() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); - let best_hash = client.chain_info().best_block_hash.clone(); - let best_number = client.chain_info().best_block_number; + let chain_info = client.chain_info(); let mut io = TestIo::new(&mut client, &mut queue, None); - sync.propagate_blocks(&best_hash, best_number, &mut io); + sync.propagate_blocks(&chain_info, &mut io); let data = &io.queue[0].data.clone(); let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(&data)); @@ -1628,13 +1626,13 @@ mod tests { fn should_add_transactions_to_queue() { // given let mut client = TestBlockChainClient::new(); - client.add_blocks(98, BlocksWith::Uncle); - client.add_blocks(1, BlocksWith::UncleAndTransaction); - client.add_blocks(1, BlocksWith::Transaction); + client.add_blocks(98, EachBlockWith::Uncle); + client.add_blocks(1, EachBlockWith::UncleAndTransaction); + client.add_blocks(1, EachBlockWith::Transaction); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let good_blocks = vec![client.block_hash_delta_minus(2)]; - let bad_blocks = vec![client.block_hash_delta_minus(1)]; + let retracted_blocks = vec![client.block_hash_delta_minus(1)]; let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); @@ -1643,7 +1641,7 @@ mod tests { sync.chain_new_blocks(&io, &[], &good_blocks); assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); - sync.chain_new_blocks(&io, &good_blocks, &bad_blocks); + sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks); // then let status = sync.transaction_queue.lock().unwrap().status(); @@ -1654,7 +1652,7 @@ mod tests { #[test] fn returns_requested_block_headers() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); @@ -1678,7 +1676,7 @@ mod tests { #[test] fn returns_requested_block_headers_reverse() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 44f3f02e0..d67a09f3b 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -157,9 +157,9 @@ impl NetworkProtocolHandler for EthSync { SyncMessage::BlockVerified => { self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); }, - SyncMessage::NewChainBlocks { ref good, ref bad } => { + SyncMessage::NewChainBlocks { ref good, ref retracted } => { let sync_io = NetSyncIo::new(io, self.chain.deref()); - self.sync.write().unwrap().chain_new_blocks(&sync_io, good, bad); + self.sync.write().unwrap().chain_new_blocks(&sync_io, good, retracted); } } } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index b388f508d..58f50916e 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -24,8 +24,8 @@ use super::helpers::*; fn two_peers() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, BlocksWith::Uncle); - net.peer_mut(2).chain.add_blocks(1000, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); @@ -35,8 +35,8 @@ fn two_peers() { fn status_after_sync() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, BlocksWith::Uncle); - net.peer_mut(2).chain.add_blocks(1000, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); let status = net.peer(0).sync.status(); assert_eq!(status.state, SyncState::Idle); @@ -45,8 +45,8 @@ fn status_after_sync() { #[test] fn takes_few_steps() { let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(100, BlocksWith::Uncle); - net.peer_mut(2).chain.add_blocks(100, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Uncle); let total_steps = net.sync(); assert!(total_steps < 7); } @@ -56,7 +56,7 @@ fn empty_blocks() { ::env_logger::init().ok(); let mut net = TestNet::new(3); for n in 0..200 { - let with = if n % 2 == 0 { BlocksWith::Nothing } else { BlocksWith::Uncle }; + let with = if n % 2 == 0 { EachBlockWith::Nothing } else { EachBlockWith::Uncle }; net.peer_mut(1).chain.add_blocks(5, with.clone()); net.peer_mut(2).chain.add_blocks(5, with); } @@ -69,14 +69,14 @@ fn empty_blocks() { fn forked() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(0).chain.add_blocks(300, BlocksWith::Uncle); - net.peer_mut(1).chain.add_blocks(300, BlocksWith::Uncle); - net.peer_mut(2).chain.add_blocks(300, BlocksWith::Uncle); - net.peer_mut(0).chain.add_blocks(100, BlocksWith::Nothing); //fork - net.peer_mut(1).chain.add_blocks(200, BlocksWith::Uncle); - net.peer_mut(2).chain.add_blocks(200, BlocksWith::Uncle); - net.peer_mut(1).chain.add_blocks(100, BlocksWith::Uncle); //fork between 1 and 2 - net.peer_mut(2).chain.add_blocks(10, BlocksWith::Nothing); + net.peer_mut(0).chain.add_blocks(300, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(300, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(300, EachBlockWith::Uncle); + net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing); //fork + net.peer_mut(1).chain.add_blocks(200, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); //fork between 1 and 2 + net.peer_mut(2).chain.add_blocks(10, EachBlockWith::Nothing); // peer 1 has the best chain of 601 blocks let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); net.sync(); @@ -88,8 +88,8 @@ fn forked() { #[test] fn restart() { let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, BlocksWith::Uncle); - net.peer_mut(2).chain.add_blocks(1000, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync_steps(8); @@ -110,8 +110,8 @@ fn status_empty() { #[test] fn status_packet() { let mut net = TestNet::new(2); - net.peer_mut(0).chain.add_blocks(100, BlocksWith::Uncle); - net.peer_mut(1).chain.add_blocks(1, BlocksWith::Uncle); + net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(1, EachBlockWith::Uncle); net.start(); @@ -124,10 +124,10 @@ fn status_packet() { #[test] fn propagate_hashes() { let mut net = TestNet::new(6); - net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); - net.peer_mut(0).chain.add_blocks(10, BlocksWith::Uncle); + net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); net.trigger_block_verified(0); //first event just sets the marker net.trigger_block_verified(0); @@ -150,10 +150,10 @@ fn propagate_hashes() { #[test] fn propagate_blocks() { let mut net = TestNet::new(2); - net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); - net.peer_mut(0).chain.add_blocks(10, BlocksWith::Uncle); + net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); net.trigger_block_verified(0); //first event just sets the marker net.trigger_block_verified(0); @@ -165,7 +165,7 @@ fn propagate_blocks() { #[test] fn restart_on_malformed_block() { let mut net = TestNet::new(2); - net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); net.peer_mut(1).chain.corrupt_block(6); net.sync_steps(10); diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index a7a5bcb6d..5b53ad90b 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -35,7 +35,7 @@ pub struct TestBlockChainClient { } #[derive(Clone)] -pub enum BlocksWith { +pub enum EachBlockWith { Nothing, Uncle, Transaction, @@ -52,12 +52,12 @@ impl TestBlockChainClient { last_hash: RwLock::new(H256::new()), difficulty: RwLock::new(From::from(0)), }; - client.add_blocks(1, BlocksWith::Nothing); // add genesis block + client.add_blocks(1, EachBlockWith::Nothing); // add genesis block client.genesis_hash = client.last_hash.read().unwrap().clone(); client } - pub fn add_blocks(&mut self, count: usize, with: BlocksWith) { + pub fn add_blocks(&mut self, count: usize, with: EachBlockWith) { let len = self.numbers.read().unwrap().len(); for n in len..(len + count) { let mut header = BlockHeader::new(); @@ -65,7 +65,7 @@ impl TestBlockChainClient { header.parent_hash = self.last_hash.read().unwrap().clone(); header.number = n as BlockNumber; let uncles = match with { - BlocksWith::Uncle | BlocksWith::UncleAndTransaction => { + EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => { let mut uncles = RlpStream::new_list(1); let mut uncle_header = BlockHeader::new(); uncle_header.difficulty = From::from(n); @@ -78,7 +78,7 @@ impl TestBlockChainClient { _ => RlpStream::new_list(0) }; let txs = match with { - BlocksWith::Transaction | BlocksWith::UncleAndTransaction => { + EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => { let mut txs = RlpStream::new_list(1); let keypair = KeyPair::create().unwrap(); let tx = Transaction { @@ -136,6 +136,10 @@ impl BlockChainClient for TestBlockChainClient { Some(U256::zero()) } + fn block_hash(&self, _id: BlockId) -> Option { + unimplemented!(); + } + fn nonce(&self, _address: &Address) -> U256 { U256::zero() } diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 83665dfda..8b38c64ad 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -108,27 +108,29 @@ struct TransactionSet { } impl TransactionSet { - fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) { + fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) -> Option { self.by_priority.insert(order.clone()); - self.by_address.insert(sender, nonce, order); + self.by_address.insert(sender, nonce, order) } - fn enforce_limit(&mut self, by_hash: &HashMap) { + fn enforce_limit(&mut self, by_hash: &mut HashMap) { let len = self.by_priority.len(); if len <= self.limit { return; } - let to_drop : Vec<&VerifiedTransaction> = { + let to_drop : Vec<(Address, U256)> = { self.by_priority .iter() .skip(self.limit) .map(|order| by_hash.get(&order.hash).expect("Inconsistency in queue detected.")) + .map(|tx| (tx.sender(), tx.nonce())) .collect() }; - for tx in to_drop { - self.drop(&tx.sender(), &tx.nonce()); + for (sender, nonce) in to_drop { + let order = self.drop(&sender, &nonce).expect("Dropping transaction found in priority queue failed."); + by_hash.remove(&order.hash).expect("Inconsistency in queue."); } } @@ -236,26 +238,50 @@ impl TransactionQueue { // We don't know this transaction return; } + let transaction = transaction.unwrap(); let sender = transaction.sender(); let nonce = transaction.nonce(); + let current_nonce = fetch_nonce(&sender); // Remove from future - self.future.drop(&sender, &nonce); - - // Remove from current - let order = self.current.drop(&sender, &nonce); - if order.is_none() { + let order = self.future.drop(&sender, &nonce); + if order.is_some() { + self.recalculate_future_for_sender(&sender, current_nonce); + // And now lets check if there is some chain of transactions in future + // that should be placed in current + self.move_future_txs(sender.clone(), current_nonce, current_nonce); return; } - // Let's remove transactions where tx.nonce < current_nonce - // and if there are any future transactions matching current_nonce+1 - move to current - let current_nonce = fetch_nonce(&sender); - // We will either move transaction to future or remove it completely - // so there will be no transactions from this sender in current - self.last_nonces.remove(&sender); + // Remove from current + let order = self.current.drop(&sender, &nonce); + if order.is_some() { + // We will either move transaction to future or remove it completely + // so there will be no transactions from this sender in current + self.last_nonces.remove(&sender); + // This should move all current transactions to future and remove old transactions + self.move_all_to_future(&sender, current_nonce); + // And now lets check if there is some chain of transactions in future + // that should be placed in current. It should also update last_nonces. + self.move_future_txs(sender.clone(), current_nonce, current_nonce); + return; + } + } + fn recalculate_future_for_sender(&mut self, sender: &Address, current_nonce: U256) { + // We need to drain all transactions for current sender from future and reinsert them with updated height + let all_nonces_from_sender = match self.future.by_address.row(&sender) { + Some(row_map) => row_map.keys().cloned().collect::>(), + None => vec![], + }; + for k in all_nonces_from_sender { + let order = self.future.drop(&sender, &k).unwrap(); + self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); + } + } + + fn move_all_to_future(&mut self, sender: &Address, current_nonce: U256) { let all_nonces_from_sender = match self.current.by_address.row(&sender) { Some(row_map) => row_map.keys().cloned().collect::>(), None => vec![], @@ -270,7 +296,7 @@ impl TransactionQueue { self.by_hash.remove(&order.hash); } } - self.future.enforce_limit(&self.by_hash); + self.future.enforce_limit(&mut self.by_hash); // And now lets check if there is some chain of transactions in future // that should be placed in current @@ -279,6 +305,7 @@ impl TransactionQueue { } } + /// Returns top transactions from the queue pub fn top_transactions(&self, size: usize) -> Vec { self.current.by_priority @@ -297,11 +324,11 @@ impl TransactionQueue { self.last_nonces.clear(); } - fn move_future_txs(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) -> Option { + fn move_future_txs(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) { { let by_nonce = self.future.by_address.row_mut(&address); if let None = by_nonce { - return None; + return; } let mut by_nonce = by_nonce.unwrap(); while let Some(order) = by_nonce.remove(¤t_nonce) { @@ -314,47 +341,69 @@ impl TransactionQueue { } } self.future.by_address.clear_if_empty(&address); - // Returns last inserted nonce - Some(current_nonce - U256::one()) + // Update last inserted nonce + self.last_nonces.insert(address, current_nonce - U256::one()); } fn import_tx(&mut self, tx: VerifiedTransaction, fetch_nonce: &T) where T: Fn(&Address) -> U256 { + + if self.by_hash.get(&tx.hash()).is_some() { + // Transaction is already imported. + trace!(target: "sync", "Dropping already imported transaction with hash: {:?}", tx.hash()); + return; + } + let nonce = tx.nonce(); let address = tx.sender(); + let state_nonce = fetch_nonce(&address); let next_nonce = self.last_nonces .get(&address) .cloned() - .map_or_else(|| fetch_nonce(&address), |n| n + U256::one()); + .map_or(state_nonce, |n| n + U256::one()); // Check height if nonce > next_nonce { - let order = TransactionOrder::for_transaction(&tx, next_nonce); - // Insert to by_hash - self.by_hash.insert(tx.hash(), tx); // We have a gap - put to future - self.future.insert(address, nonce, order); - self.future.enforce_limit(&self.by_hash); + Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash); + self.future.enforce_limit(&mut self.by_hash); return; - } else if next_nonce > nonce { + } else if nonce < state_nonce { // Droping transaction trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce); return; } let base_nonce = fetch_nonce(&address); - let order = TransactionOrder::for_transaction(&tx, base_nonce); - // Insert to by_hash - self.by_hash.insert(tx.hash(), tx); - // Insert to current - self.current.insert(address.clone(), nonce, order); + Self::replace_transaction(tx, base_nonce.clone(), &mut self.current, &mut self.by_hash); + self.last_nonces.insert(address.clone(), nonce); // But maybe there are some more items waiting in future? - let new_last_nonce = self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce); - self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); - // Enforce limit - self.current.enforce_limit(&self.by_hash); + self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce); + self.current.enforce_limit(&mut self.by_hash); + } + + fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, set: &mut TransactionSet, by_hash: &mut HashMap) { + let order = TransactionOrder::for_transaction(&tx, base_nonce); + let hash = tx.hash(); + let address = tx.sender(); + let nonce = tx.nonce(); + + by_hash.insert(hash.clone(), tx); + if let Some(old) = set.insert(address, nonce, order.clone()) { + // There was already transaction in queue. Let's check which one should stay + if old.cmp(&order) == Ordering::Greater { + assert!(old.nonce_height == order.nonce_height, "Both transactions should have the same height."); + // Put back old transaction since it has greater priority (higher gas_price) + set.insert(address, nonce, old); + by_hash.remove(&hash); + } else { + // Make sure we remove old transaction entirely + set.by_priority.remove(&old); + by_hash.remove(&old.hash); + } + } } } @@ -363,6 +412,7 @@ impl TransactionQueue { mod test { extern crate rustc_serialize; use self::rustc_serialize::hex::FromHex; + use std::ops::Deref; use std::collections::{HashMap, BTreeSet}; use util::crypto::KeyPair; use util::numbers::{U256, Uint}; @@ -413,7 +463,7 @@ mod test { let (tx1, tx2) = new_txs(U256::from(1)); let tx1 = VerifiedTransaction::new(tx1); let tx2 = VerifiedTransaction::new(tx2); - let by_hash = { + let mut by_hash = { let mut x = HashMap::new(); let tx1 = VerifiedTransaction::new(tx1.transaction.clone()); let tx2 = VerifiedTransaction::new(tx2.transaction.clone()); @@ -430,9 +480,10 @@ mod test { assert_eq!(set.by_address.len(), 2); // when - set.enforce_limit(&by_hash); + set.enforce_limit(&mut by_hash); // then + assert_eq!(by_hash.len(), 1); assert_eq!(set.by_priority.len(), 1); assert_eq!(set.by_address.len(), 1); assert_eq!(set.by_priority.iter().next().unwrap().clone(), order1); @@ -633,7 +684,26 @@ mod test { } #[test] - fn should_accept_same_transaction_twice() { + fn should_not_insert_same_transaction_twice() { + // given + let nonce = |a: &Address| default_nonce(a) + U256::one(); + let mut txq = TransactionQueue::new(); + let (_tx1, tx2) = new_txs(U256::from(1)); + txq.add(tx2.clone(), &default_nonce); + assert_eq!(txq.status().future, 1); + assert_eq!(txq.status().pending, 0); + + // when + txq.add(tx2.clone(), &nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.future, 1); + assert_eq!(stats.pending, 0); + } + + #[test] + fn should_accept_same_transaction_twice_if_removed() { // given let mut txq = TransactionQueue::new(); let (tx1, tx2) = new_txs(U256::from(1)); @@ -675,4 +745,78 @@ mod test { assert_eq!(stats.pending, 2); } + #[test] + fn should_replace_same_transaction_when_has_higher_fee() { + // given + let mut txq = TransactionQueue::new(); + let keypair = KeyPair::create().unwrap(); + let tx = new_unsigned_tx(U256::from(123)).sign(&keypair.secret()); + let tx2 = { + let mut tx2 = tx.deref().clone(); + tx2.gas_price = U256::from(200); + tx2.sign(&keypair.secret()) + }; + + // when + txq.add(tx, &default_nonce); + txq.add(tx2, &default_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.pending, 1); + assert_eq!(stats.future, 0); + assert_eq!(txq.top_transactions(1)[0].gas_price, U256::from(200)); + } + + #[test] + fn should_replace_same_transaction_when_importing_to_futures() { + // given + let mut txq = TransactionQueue::new(); + let keypair = KeyPair::create().unwrap(); + let tx0 = new_unsigned_tx(U256::from(123)).sign(&keypair.secret()); + let tx1 = { + let mut tx1 = tx0.deref().clone(); + tx1.nonce = U256::from(124); + tx1.sign(&keypair.secret()) + }; + let tx2 = { + let mut tx2 = tx1.deref().clone(); + tx2.gas_price = U256::from(200); + tx2.sign(&keypair.secret()) + }; + + // when + txq.add(tx1, &default_nonce); + txq.add(tx2, &default_nonce); + assert_eq!(txq.status().future, 1); + txq.add(tx0, &default_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.future, 0); + assert_eq!(stats.pending, 2); + assert_eq!(txq.top_transactions(2)[1].gas_price, U256::from(200)); + } + + #[test] + fn should_recalculate_height_when_removing_from_future() { + // given + let previous_nonce = |a: &Address| default_nonce(a) - U256::one(); + let next_nonce = |a: &Address| default_nonce(a) + U256::one(); + let mut txq = TransactionQueue::new(); + let (tx1, tx2) = new_txs(U256::one()); + txq.add(tx1.clone(), &previous_nonce); + txq.add(tx2, &previous_nonce); + assert_eq!(txq.status().future, 2); + + // when + txq.remove(&tx1.hash(), &next_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.future, 0); + assert_eq!(stats.pending, 1); + } + + } diff --git a/util/Cargo.toml b/util/Cargo.toml index 0c7df3f40..9c5cb3fe3 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -36,6 +36,7 @@ libc = "0.2.7" vergen = "0.1" target_info = "0.1" bigint = { path = "bigint" } +chrono = "0.2" [features] default = [] diff --git a/util/benches/bigint.rs b/util/benches/bigint.rs index fc41ab628..575164cb6 100644 --- a/util/benches/bigint.rs +++ b/util/benches/bigint.rs @@ -28,7 +28,7 @@ extern crate ethcore_util; extern crate rand; use test::{Bencher, black_box}; -use ethcore_util::uint::*; +use ethcore_util::numbers::*; #[bench] fn u256_add(b: &mut Bencher) { diff --git a/util/benches/rlp.rs b/util/benches/rlp.rs index e94cb3635..4a983f369 100644 --- a/util/benches/rlp.rs +++ b/util/benches/rlp.rs @@ -28,7 +28,7 @@ extern crate ethcore_util; use test::Bencher; use std::str::FromStr; use ethcore_util::rlp::*; -use ethcore_util::uint::U256; +use ethcore_util::numbers::U256; #[bench] fn bench_stream_u64_value(b: &mut Bencher) { diff --git a/util/bigint/src/uint.rs b/util/bigint/src/uint.rs index a70997dc5..bd57e9d6d 100644 --- a/util/bigint/src/uint.rs +++ b/util/bigint/src/uint.rs @@ -778,6 +778,35 @@ macro_rules! construct_uint { } } + impl serde::Deserialize for $name { + fn deserialize(deserializer: &mut D) -> Result<$name, D::Error> + where D: serde::Deserializer { + struct UintVisitor; + + impl serde::de::Visitor for UintVisitor { + type Value = $name; + + fn visit_str(&mut self, value: &str) -> Result where E: serde::Error { + // 0x + len + if value.len() != 2 + $n_words / 8 { + return Err(serde::Error::custom("Invalid length.")); + } + + match $name::from_str(&value[2..]) { + Ok(val) => Ok(val), + Err(_) => { return Err(serde::Error::custom("Invalid length.")); } + } + } + + fn visit_string(&mut self, value: String) -> Result where E: serde::Error { + self.visit_str(value.as_ref()) + } + } + + deserializer.deserialize(UintVisitor) + } + } + impl From for $name { fn from(value: u64) -> $name { let mut ret = [0; $n_words]; diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 5e6ca47c2..01e53f819 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -25,7 +25,10 @@ use kvdb::{Database, DBTransaction, DatabaseConfig}; use std::env; /// Implementation of the HashDB trait for a disk-backed database with a memory overlay -/// and latent-removal semantics. +/// and, possibly, latent-removal semantics. +/// +/// If `counters` is `None`, then it behaves exactly like OverlayDB. If not it behaves +/// differently: /// /// Like OverlayDB, there is a memory overlay; `commit()` must be called in order to /// write operations out to disk. Unlike OverlayDB, `remove()` operations do not take effect @@ -34,7 +37,7 @@ use std::env; pub struct JournalDB { overlay: MemoryDB, backing: Arc, - counters: Arc>>, + counters: Option>>>, } impl Clone for JournalDB { @@ -48,10 +51,11 @@ impl Clone for JournalDB { } // all keys must be at least 12 bytes -const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; -const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; +const LATEST_ERA_KEY : [u8; 12] = [ b'l', b'a', b's', b't', 0, 0, 0, 0, 0, 0, 0, 0 ]; +const VERSION_KEY : [u8; 12] = [ b'j', b'v', b'e', b'r', 0, 0, 0, 0, 0, 0, 0, 0 ]; -const DB_VERSION: u32 = 3; +const DB_VERSION : u32 = 3; +const DB_VERSION_NO_JOURNAL : u32 = 3 + 256; const PADDING : [u8; 10] = [ 0u8; 10 ]; @@ -59,25 +63,38 @@ impl JournalDB { /// Create a new instance from file pub fn new(path: &str) -> JournalDB { + Self::from_prefs(path, true) + } + + /// Create a new instance from file + pub fn from_prefs(path: &str, prefer_journal: bool) -> JournalDB { let opts = DatabaseConfig { prefix_size: Some(12) //use 12 bytes as prefix, this must match account_db prefix }; let backing = Database::open(&opts, path).unwrap_or_else(|e| { panic!("Error opening state db: {}", e); }); + let with_journal; if !backing.is_empty() { match backing.get(&VERSION_KEY).map(|d| d.map(|v| decode::(&v))) { - Ok(Some(DB_VERSION)) => {}, + Ok(Some(DB_VERSION)) => { with_journal = true; }, + Ok(Some(DB_VERSION_NO_JOURNAL)) => { with_journal = false; }, v => panic!("Incompatible DB version, expected {}, got {:?}", DB_VERSION, v) } } else { - backing.put(&VERSION_KEY, &encode(&DB_VERSION)).expect("Error writing version to database"); + backing.put(&VERSION_KEY, &encode(&(if prefer_journal { DB_VERSION } else { DB_VERSION_NO_JOURNAL }))).expect("Error writing version to database"); + with_journal = prefer_journal; } - let counters = JournalDB::read_counters(&backing); + + let counters = if with_journal { + Some(Arc::new(RwLock::new(JournalDB::read_counters(&backing)))) + } else { + None + }; JournalDB { overlay: MemoryDB::new(), backing: Arc::new(backing), - counters: Arc::new(RwLock::new(counters)), + counters: counters, } } @@ -94,9 +111,47 @@ impl JournalDB { self.backing.get(&LATEST_ERA_KEY).expect("Low level database error").is_none() } + /// Commit all recent insert operations. + pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + let have_counters = self.counters.is_some(); + if have_counters { + self.commit_with_counters(now, id, end) + } else { + self.commit_without_counters() + } + } + + /// Drain the overlay and place it into a batch for the DB. + fn batch_overlay_insertions(overlay: &mut MemoryDB, batch: &DBTransaction) -> usize { + let mut inserts = 0usize; + let mut deletes = 0usize; + for i in overlay.drain().into_iter() { + let (key, (value, rc)) = i; + if rc > 0 { + assert!(rc == 1); + batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?"); + inserts += 1; + } + if rc < 0 { + assert!(rc == -1); + deletes += 1; + } + } + trace!("commit: Inserted {}, Deleted {} nodes", inserts, deletes); + inserts + deletes + } + + /// Just commit the overlay into the backing DB. + fn commit_without_counters(&mut self) -> Result { + let batch = DBTransaction::new(); + let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch); + try!(self.backing.write(batch)); + Ok(ret as u32) + } + /// Commit all recent insert operations and historical removals from the old era /// to the backing database. - pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { + fn commit_with_counters(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // journal format: // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] // [era, 1] => [ id, [insert_0, ...], [remove_0, ...] ] @@ -121,21 +176,30 @@ impl JournalDB { // and the key is safe to delete. // record new commit's details. - debug!("commit: #{} ({}), end era: {:?}", now, id, end); + trace!("commit: #{} ({}), end era: {:?}", now, id, end); + let mut counters = self.counters.as_ref().unwrap().write().unwrap(); let batch = DBTransaction::new(); - let mut counters = self.counters.write().unwrap(); { let mut index = 0usize; let mut last; - while try!(self.backing.get({ - let mut r = RlpStream::new_list(3); - r.append(&now); - r.append(&index); - r.append(&&PADDING[..]); - last = r.drain(); - &last - })).is_some() { + while { + let record = try!(self.backing.get({ + let mut r = RlpStream::new_list(3); + r.append(&now); + r.append(&index); + r.append(&&PADDING[..]); + last = r.drain(); + &last + })); + match record { + Some(r) => { + assert!(&Rlp::new(&r).val_at::(0) != id); + true + }, + None => false, + } + } { index += 1; } @@ -181,6 +245,7 @@ impl JournalDB { trace!("Purging nodes inserted in non-canon: {:?}", inserts); to_remove.append(&mut inserts); } + trace!("commit: Delete journal for time #{}.{}: {}, (canon was {}): {} entries", end_era, index, rlp.val_at::(0), canon_id, to_remove.len()); try!(batch.delete(&last)); index += 1; } @@ -188,33 +253,18 @@ impl JournalDB { let canon_inserts = canon_inserts.drain(..).collect::>(); // Purge removed keys if they are not referenced and not re-inserted in the canon commit let mut deletes = 0; + trace!("Purging filtered nodes: {:?}", to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)).collect::>()); for h in to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)) { try!(batch.delete(&h)); deletes += 1; } - debug!("commit: Delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, deletes); + trace!("Total nodes purged: {}", deletes); } // Commit overlay insertions - let mut ret = 0u32; - let mut deletes = 0usize; - for i in self.overlay.drain().into_iter() { - let (key, (value, rc)) = i; - if rc > 0 { - assert!(rc == 1); - batch.put(&key.bytes(), &value).expect("Low-level database error. Some issue with your hard disk?"); - ret += 1; - } - if rc < 0 { - assert!(rc == -1); - ret += 1; - deletes += 1; - } - } - + let ret = Self::batch_overlay_insertions(&mut self.overlay, &batch); try!(self.backing.write(batch)); - debug!("commit: Deleted {} nodes", deletes); - Ok(ret) + Ok(ret as u32) } @@ -262,7 +312,7 @@ impl JournalDB { era -= 1; } } - debug!("Recovered {} counters", res.len()); + trace!("Recovered {} counters", res.len()); res } } diff --git a/util/src/keys/directory.rs b/util/src/keys/directory.rs index 7178508a2..d0d3393cd 100644 --- a/util/src/keys/directory.rs +++ b/util/src/keys/directory.rs @@ -461,17 +461,17 @@ enum KeyFileLoadError { pub struct KeyDirectory { /// Directory path for key management. path: String, - cache: RefCell>, - cache_usage: RefCell>, + cache: RwLock>, + cache_usage: RwLock>, } impl KeyDirectory { /// Initializes new cache directory context with a given `path` pub fn new(path: &Path) -> KeyDirectory { KeyDirectory { - cache: RefCell::new(HashMap::new()), + cache: RwLock::new(HashMap::new()), path: path.to_str().expect("Initialized key directory with empty path").to_owned(), - cache_usage: RefCell::new(VecDeque::new()), + cache_usage: RwLock::new(VecDeque::new()), } } @@ -484,7 +484,7 @@ impl KeyDirectory { let json_bytes = json_text.into_bytes(); try!(file.write(&json_bytes)); } - let mut cache = self.cache.borrow_mut(); + let mut cache = self.cache.write().unwrap(); let id = key_file.id.clone(); cache.insert(id.clone(), key_file); Ok(id.clone()) @@ -495,14 +495,14 @@ impl KeyDirectory { pub fn get(&self, id: &Uuid) -> Option { let path = self.key_path(id); { - let mut usage = self.cache_usage.borrow_mut(); + let mut usage = self.cache_usage.write().unwrap(); usage.push_back(id.clone()); } - if !self.cache.borrow().contains_key(id) { + if !self.cache.read().unwrap().contains_key(id) { match KeyDirectory::load_key(&path) { Ok(loaded_key) => { - self.cache.borrow_mut().insert(id.to_owned(), loaded_key); + self.cache.write().unwrap().insert(id.to_owned(), loaded_key); } Err(error) => { warn!(target: "sstore", "error loading key {:?}: {:?}", id, error); @@ -512,7 +512,7 @@ impl KeyDirectory { } // todo: replace with Ref::map when it stabilized to avoid copies - Some(self.cache.borrow().get(id) + Some(self.cache.read().unwrap().get(id) .expect("Key should be there, we have just inserted or checked it.") .clone()) } @@ -524,7 +524,7 @@ impl KeyDirectory { /// Removes keys that never been requested during last `MAX_USAGE_TRACK` times pub fn collect_garbage(&mut self) { - let mut cache_usage = self.cache_usage.borrow_mut(); + let mut cache_usage = self.cache_usage.write().unwrap(); let total_usages = cache_usage.len(); let untracked_usages = max(total_usages as i64 - MAX_CACHE_USAGE_TRACK as i64, 0) as usize; @@ -532,31 +532,31 @@ impl KeyDirectory { cache_usage.drain(..untracked_usages); } - if self.cache.borrow().len() <= MAX_CACHE_USAGE_TRACK { return; } + if self.cache.read().unwrap().len() <= MAX_CACHE_USAGE_TRACK { return; } let uniqs: HashSet<&Uuid> = cache_usage.iter().collect(); let removes:Vec = { - let cache = self.cache.borrow(); + let cache = self.cache.read().unwrap(); cache.keys().cloned().filter(|key| !uniqs.contains(key)).collect() }; if removes.is_empty() { return; } - let mut cache = self.cache.borrow_mut(); + let mut cache = self.cache.write().unwrap(); for key in removes { cache.remove(&key); } } /// Reports how many keys are currently cached. pub fn cache_size(&self) -> usize { - self.cache.borrow().len() + self.cache.read().unwrap().len() } /// Removes key file from key directory pub fn delete(&mut self, id: &Uuid) -> Result<(), ::std::io::Error> { let path = self.key_path(id); - if !self.cache.borrow().contains_key(id) { + if !self.cache.read().unwrap().contains_key(id) { return match fs::remove_file(&path) { Ok(_) => { - self.cache.borrow_mut().remove(&id); + self.cache.write().unwrap().remove(&id); Ok(()) }, Err(e) => Err(e) diff --git a/util/src/keys/store.rs b/util/src/keys/store.rs index c4fa377f9..625d6fd8f 100644 --- a/util/src/keys/store.rs +++ b/util/src/keys/store.rs @@ -22,6 +22,7 @@ use rcrypto::pbkdf2::*; use rcrypto::scrypt::*; use rcrypto::hmac::*; use crypto; +use chrono::*; const KEY_LENGTH: u32 = 32; const KEY_ITERATIONS: u32 = 10240; @@ -55,9 +56,26 @@ pub enum EncryptedHashMapError { InvalidValueFormat(FromBytesError), } +/// Error retrieving value from encrypted hashmap +#[derive(Debug)] +pub enum SigningError { + /// Account passed does not exist + NoAccount, + /// Account passed is not unlocked + AccountNotUnlocked, + /// Invalid secret in store + InvalidSecret +} + /// Represent service for storing encrypted arbitrary data pub struct SecretStore { - directory: KeyDirectory + directory: KeyDirectory, + unlocks: RwLock>, +} + +struct AccountUnlock { + secret: H256, + expires: DateTime, } impl SecretStore { @@ -72,7 +90,8 @@ impl SecretStore { /// new instance of Secret Store in specific directory pub fn new_in(path: &Path) -> SecretStore { SecretStore { - directory: KeyDirectory::new(path) + directory: KeyDirectory::new(path), + unlocks: RwLock::new(HashMap::new()), } } @@ -86,7 +105,7 @@ impl SecretStore { import_path.push(".ethereum"); import_path.push("keystore"); if let Err(e) = geth_import::import_geth_keys(self, &import_path) { - warn!(target: "sstore", "Error retrieving geth keys: {:?}", e) + trace!(target: "sstore", "Geth key not imported: {:?}", e); } } @@ -120,9 +139,57 @@ impl SecretStore { #[cfg(test)] fn new_test(path: &::devtools::RandomTempPath) -> SecretStore { SecretStore { - directory: KeyDirectory::new(path.as_path()) + directory: KeyDirectory::new(path.as_path()), + unlocks: RwLock::new(HashMap::new()), } } + + /// Unlocks account for use + pub fn unlock_account(&self, account: &Address, pass: &str) -> Result<(), EncryptedHashMapError> { + let secret_id = try!(self.account(&account).ok_or(EncryptedHashMapError::UnknownIdentifier)); + let secret = try!(self.get(&secret_id, pass)); + { + let mut write_lock = self.unlocks.write().unwrap(); + let mut unlock = write_lock.entry(*account) + .or_insert_with(|| AccountUnlock { secret: secret, expires: UTC::now() }); + unlock.secret = secret; + unlock.expires = UTC::now() + Duration::minutes(20); + } + Ok(()) + } + + /// Creates new account + pub fn new_account(&mut self, pass: &str) -> Result { + let secret = H256::random(); + let key_id = H128::random(); + self.insert(key_id.clone(), secret, pass); + + let mut key_file = self.directory.get(&key_id).expect("the key was just inserted"); + let address = Address::random(); + key_file.account = Some(address); + try!(self.directory.save(key_file)); + Ok(address) + } + + /// Signs message with unlocked account + pub fn sign(&self, account: &Address, message: &H256) -> Result { + let read_lock = self.unlocks.read().unwrap(); + let unlock = try!(read_lock.get(account).ok_or(SigningError::AccountNotUnlocked)); + match crypto::KeyPair::from_secret(unlock.secret) { + Ok(pair) => match pair.sign(message) { + Ok(signature) => Ok(signature), + Err(_) => Err(SigningError::InvalidSecret) + }, + Err(_) => Err(SigningError::InvalidSecret) + } + } + + /// Returns secret for unlocked account + pub fn account_secret(&self, account: &Address) -> Result { + let read_lock = self.unlocks.read().unwrap(); + let unlock = try!(read_lock.get(account).ok_or(SigningError::AccountNotUnlocked)); + Ok(unlock.secret as crypto::Secret) + } } fn derive_key_iterations(password: &str, salt: &H256, c: u32) -> (Bytes, Bytes) { @@ -369,6 +436,40 @@ mod tests { assert_eq!(4, sstore.directory.list().unwrap().len()) } + #[test] + fn can_create_account() { + let temp = RandomTempPath::create_dir(); + let mut sstore = SecretStore::new_test(&temp); + sstore.new_account("123").unwrap(); + assert_eq!(1, sstore.accounts().unwrap().len()); + } + + #[test] + fn can_unlock_account() { + let temp = RandomTempPath::create_dir(); + let mut sstore = SecretStore::new_test(&temp); + let address = sstore.new_account("123").unwrap(); + + let secret = sstore.unlock_account(&address, "123"); + assert!(secret.is_ok()); + } + + #[test] + fn can_sign_data() { + let temp = RandomTempPath::create_dir(); + let address = { + let mut sstore = SecretStore::new_test(&temp); + sstore.new_account("334").unwrap() + }; + let signature = { + let sstore = SecretStore::new_test(&temp); + sstore.unlock_account(&address, "334").unwrap(); + sstore.sign(&address, &H256::random()).unwrap() + }; + + assert!(signature != x!(0)); + } + #[test] fn can_import_account() { use keys::directory::{KeyFileContent, KeyFileCrypto}; diff --git a/util/src/lib.rs b/util/src/lib.rs index 8594e6f40..a50ba8da4 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -111,6 +111,7 @@ extern crate rustc_version; extern crate target_info; extern crate vergen; extern crate bigint; +extern crate chrono; pub mod standard; #[macro_use] diff --git a/util/src/network/host.rs b/util/src/network/host.rs index 42e8ff93d..f2cc9fe48 100644 --- a/util/src/network/host.rs +++ b/util/src/network/host.rs @@ -400,7 +400,8 @@ impl Host where Message: Send + Sync + Clone { // public_endpoint in host info contains local adderss at this point let listen_address = self.info.read().unwrap().public_endpoint.address.clone(); let udp_port = self.info.read().unwrap().config.udp_port.unwrap_or(listen_address.port()); - let public_endpoint = match self.info.read().unwrap().config.public_address { + let public_address = self.info.read().unwrap().config.public_address.clone(); + let public_endpoint = match public_address { None => { let public_address = select_public_address(listen_address.port()); let local_endpoint = NodeEndpoint { address: public_address, udp_port: udp_port }; diff --git a/util/src/overlaydb.rs b/util/src/overlaydb.rs index f4ed2d5d6..3c80f4148 100644 --- a/util/src/overlaydb.rs +++ b/util/src/overlaydb.rs @@ -146,7 +146,7 @@ impl OverlayDB { }) } - /// Get the refs and value of the given key. + /// Put the refs and value of the given key, possibly deleting it from the db. fn put_payload(&self, key: &H256, payload: (Bytes, u32)) -> bool { if payload.1 > 0 { let mut s = RlpStream::new_list(2);