From 0109e5e9d4fa1110b59d907b32a158cd3b3d5762 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 5 Mar 2016 13:03:34 +0100 Subject: [PATCH 01/19] Removing memory leak when transactions are dropped from set --- sync/src/transaction_queue.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 83665dfda..7f9f21638 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -113,22 +113,24 @@ impl TransactionSet { self.by_address.insert(sender, nonce, order); } - fn enforce_limit(&mut self, by_hash: &HashMap) { + fn enforce_limit(&mut self, by_hash: &mut HashMap) { let len = self.by_priority.len(); if len <= self.limit { return; } - let to_drop : Vec<&VerifiedTransaction> = { + let to_drop : Vec<(Address, U256)> = { self.by_priority .iter() .skip(self.limit) .map(|order| by_hash.get(&order.hash).expect("Inconsistency in queue detected.")) + .map(|tx| (tx.sender(), tx.nonce())) .collect() }; - for tx in to_drop { - self.drop(&tx.sender(), &tx.nonce()); + for (sender, nonce) in to_drop { + let order = self.drop(&sender, &nonce).expect("Droping transaction failed."); + by_hash.remove(&order.hash).expect("Inconsistency in queue."); } } @@ -270,7 +272,7 @@ impl TransactionQueue { self.by_hash.remove(&order.hash); } } - self.future.enforce_limit(&self.by_hash); + self.future.enforce_limit(&mut self.by_hash); // And now lets check if there is some chain of transactions in future // that should be placed in current @@ -335,7 +337,7 @@ impl TransactionQueue { self.by_hash.insert(tx.hash(), tx); // We have a gap - put to future self.future.insert(address, nonce, order); - self.future.enforce_limit(&self.by_hash); + self.future.enforce_limit(&mut self.by_hash); return; } else if next_nonce > nonce { // Droping transaction @@ -354,7 +356,7 @@ impl TransactionQueue { let new_last_nonce = self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce); self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); // Enforce limit - self.current.enforce_limit(&self.by_hash); + self.current.enforce_limit(&mut self.by_hash); } } @@ -413,7 +415,7 @@ mod test { let (tx1, tx2) = new_txs(U256::from(1)); let tx1 = VerifiedTransaction::new(tx1); let tx2 = VerifiedTransaction::new(tx2); - let by_hash = { + let mut by_hash = { let mut x = HashMap::new(); let tx1 = VerifiedTransaction::new(tx1.transaction.clone()); let tx2 = VerifiedTransaction::new(tx2.transaction.clone()); @@ -430,9 +432,10 @@ mod test { assert_eq!(set.by_address.len(), 2); // when - set.enforce_limit(&by_hash); + set.enforce_limit(&mut by_hash); // then + assert_eq!(by_hash.len(), 1); assert_eq!(set.by_priority.len(), 1); assert_eq!(set.by_address.len(), 1); assert_eq!(set.by_priority.iter().next().unwrap().clone(), order1); From 78a39d3ac9b360d59c7acb430182e3fe35c0e096 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 5 Mar 2016 14:34:15 +0100 Subject: [PATCH 02/19] Avoid importing same transaction twice (especially with different nonce_height) --- sync/src/transaction_queue.rs | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 7f9f21638..51ff211f6 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -129,7 +129,7 @@ impl TransactionSet { }; for (sender, nonce) in to_drop { - let order = self.drop(&sender, &nonce).expect("Droping transaction failed."); + let order = self.drop(&sender, &nonce).expect("Dropping transaction failed."); by_hash.remove(&order.hash).expect("Inconsistency in queue."); } } @@ -322,6 +322,12 @@ impl TransactionQueue { fn import_tx(&mut self, tx: VerifiedTransaction, fetch_nonce: &T) where T: Fn(&Address) -> U256 { + + if self.by_hash.get(&tx.hash()).is_some() { + // Transaction is already imported. + return; + } + let nonce = tx.nonce(); let address = tx.sender(); @@ -355,7 +361,6 @@ impl TransactionQueue { // But maybe there are some more items waiting in future? let new_last_nonce = self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce); self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); - // Enforce limit self.current.enforce_limit(&mut self.by_hash); } } @@ -636,7 +641,26 @@ mod test { } #[test] - fn should_accept_same_transaction_twice() { + fn should_not_insert_same_transaction_twice() { + // given + let nonce = |a: &Address| default_nonce(a) + U256::one(); + let mut txq = TransactionQueue::new(); + let (_tx1, tx2) = new_txs(U256::from(1)); + txq.add(tx2.clone(), &default_nonce); + assert_eq!(txq.status().future, 1); + assert_eq!(txq.status().pending, 0); + + // when + txq.add(tx2.clone(), &nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.future, 1); + assert_eq!(stats.pending, 0); + } + + #[test] + fn should_accept_same_transaction_twice_if_removed() { // given let mut txq = TransactionQueue::new(); let (tx1, tx2) = new_txs(U256::from(1)); From 765d7179f583245a432ec1f6e7c684836c60edd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 5 Mar 2016 15:43:04 +0100 Subject: [PATCH 03/19] Failing tests for transaction queue --- sync/src/transaction_queue.rs | 77 ++++++++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 51ff211f6..503af7b16 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -129,7 +129,7 @@ impl TransactionSet { }; for (sender, nonce) in to_drop { - let order = self.drop(&sender, &nonce).expect("Dropping transaction failed."); + let order = self.drop(&sender, &nonce).expect("Dropping transaction found in priority queue failed."); by_hash.remove(&order.hash).expect("Inconsistency in queue."); } } @@ -325,6 +325,7 @@ impl TransactionQueue { if self.by_hash.get(&tx.hash()).is_some() { // Transaction is already imported. + trace!(target: "sync", "Dropping already imported transaction with hash: {:?}", tx.hash()); return; } @@ -370,6 +371,7 @@ impl TransactionQueue { mod test { extern crate rustc_serialize; use self::rustc_serialize::hex::FromHex; + use std::ops::Deref; use std::collections::{HashMap, BTreeSet}; use util::crypto::KeyPair; use util::numbers::{U256, Uint}; @@ -702,4 +704,77 @@ mod test { assert_eq!(stats.pending, 2); } + #[test] + fn should_replace_same_transaction_when_has_higher_fee() { + // given + let mut txq = TransactionQueue::new(); + let keypair = KeyPair::create().unwrap(); + let tx = new_unsigned_tx(U256::from(123)).sign(&keypair.secret()); + let tx2 = { + let mut tx2 = tx.deref().clone(); + tx2.gas_price = U256::from(200); + tx2.sign(&keypair.secret()) + }; + + // when + txq.add(tx, &default_nonce); + txq.add(tx2, &default_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.pending, 1); + assert_eq!(stats.future, 0); + assert_eq!(txq.top_transactions(1)[0].gas_price, U256::from(200)); + } + + #[test] + fn should_replace_same_transaction_when_importing_to_futures() { + // given + let mut txq = TransactionQueue::new(); + let keypair = KeyPair::create().unwrap(); + let tx0 = new_unsigned_tx(U256::from(123)).sign(&keypair.secret()); + let tx1 = { + let mut tx1 = tx0.deref().clone(); + tx1.nonce = U256::from(124); + tx1.sign(&keypair.secret()) + }; + let tx2 = { + let mut tx2 = tx1.deref().clone(); + tx2.gas_price = U256::from(200); + tx2.sign(&keypair.secret()) + }; + + // when + txq.add(tx1, &default_nonce); + txq.add(tx2, &default_nonce); + assert_eq!(txq.status().future, 1); + txq.add(tx0, &default_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.future, 0); + assert_eq!(stats.pending, 2); + assert_eq!(txq.top_transactions(2)[1].gas_price, U256::from(200)); + } + + #[test] + fn should_recalculate_height_when_removing_from_future() { + // given + let previous_nonce = |a: &Address| default_nonce(a) - U256::one(); + let mut txq = TransactionQueue::new(); + let (tx1, tx2) = new_txs(U256::one()); + txq.add(tx1.clone(), &previous_nonce); + txq.add(tx2, &previous_nonce); + assert_eq!(txq.status().future, 2); + + // when + txq.remove(&tx1.hash(), &default_nonce); + + // then + let stats = txq.status(); + assert_eq!(stats.future, 0); + assert_eq!(stats.pending, 1); + } + + } From 6afa1c85b7862e504ee254ffb123ef14e1607213 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 5 Mar 2016 16:20:41 +0100 Subject: [PATCH 04/19] Replacing transactions instead of just inserting --- sync/src/transaction_queue.rs | 42 ++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 503af7b16..e05210af2 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -108,9 +108,9 @@ struct TransactionSet { } impl TransactionSet { - fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) { + fn insert(&mut self, sender: Address, nonce: U256, order: TransactionOrder) -> Option { self.by_priority.insert(order.clone()); - self.by_address.insert(sender, nonce, order); + self.by_address.insert(sender, nonce, order) } fn enforce_limit(&mut self, by_hash: &mut HashMap) { @@ -332,38 +332,54 @@ impl TransactionQueue { let nonce = tx.nonce(); let address = tx.sender(); + let state_nonce = fetch_nonce(&address); let next_nonce = self.last_nonces .get(&address) .cloned() - .map_or_else(|| fetch_nonce(&address), |n| n + U256::one()); + .map_or(state_nonce, |n| n + U256::one()); // Check height if nonce > next_nonce { - let order = TransactionOrder::for_transaction(&tx, next_nonce); - // Insert to by_hash - self.by_hash.insert(tx.hash(), tx); // We have a gap - put to future - self.future.insert(address, nonce, order); + Self::replace_transaction(tx, next_nonce, &mut self.future, &mut self.by_hash); self.future.enforce_limit(&mut self.by_hash); return; - } else if next_nonce > nonce { + } else if nonce < state_nonce { // Droping transaction trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce); return; } let base_nonce = fetch_nonce(&address); - let order = TransactionOrder::for_transaction(&tx, base_nonce); - // Insert to by_hash - self.by_hash.insert(tx.hash(), tx); - // Insert to current - self.current.insert(address.clone(), nonce, order); + Self::replace_transaction(tx, base_nonce.clone(), &mut self.current, &mut self.by_hash); // But maybe there are some more items waiting in future? let new_last_nonce = self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce); self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); self.current.enforce_limit(&mut self.by_hash); } + + fn replace_transaction(tx: VerifiedTransaction, base_nonce: U256, set: &mut TransactionSet, by_hash: &mut HashMap) { + let order = TransactionOrder::for_transaction(&tx, base_nonce); + let hash = tx.hash(); + let address = tx.sender(); + let nonce = tx.nonce(); + + by_hash.insert(hash.clone(), tx); + if let Some(old) = set.insert(address, nonce, order.clone()) { + // There was already transaction in queue. Let's check which one should stay + if old.cmp(&order) == Ordering::Greater { + assert!(old.nonce_height == order.nonce_height, "Both transactions should have the same height."); + // Put back old transaction since it has greater priority (higher gas_price) + set.insert(address, nonce, old); + by_hash.remove(&hash); + } else { + // Make sure we remove old transaction entirely + set.by_priority.remove(&old); + by_hash.remove(&old.hash); + } + } + } } From 0a7fc4af738ed597239036e5c39fca0473c61512 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 5 Mar 2016 16:42:34 +0100 Subject: [PATCH 05/19] Recalculating heights in future when removing transaction --- sync/src/transaction_queue.rs | 68 ++++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index e05210af2..24bb772d7 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -238,26 +238,50 @@ impl TransactionQueue { // We don't know this transaction return; } + let transaction = transaction.unwrap(); let sender = transaction.sender(); let nonce = transaction.nonce(); + let current_nonce = fetch_nonce(&sender); // Remove from future - self.future.drop(&sender, &nonce); - - // Remove from current - let order = self.current.drop(&sender, &nonce); - if order.is_none() { + let order = self.future.drop(&sender, &nonce); + if order.is_some() { + self.recalculate_future_for_sender(&sender, current_nonce); + // And now lets check if there is some chain of transactions in future + // that should be placed in current + self.move_future_txs(sender.clone(), current_nonce, current_nonce); return; } - // Let's remove transactions where tx.nonce < current_nonce - // and if there are any future transactions matching current_nonce+1 - move to current - let current_nonce = fetch_nonce(&sender); - // We will either move transaction to future or remove it completely - // so there will be no transactions from this sender in current - self.last_nonces.remove(&sender); + // Remove from current + let order = self.current.drop(&sender, &nonce); + if order.is_some() { + // We will either move transaction to future or remove it completely + // so there will be no transactions from this sender in current + self.last_nonces.remove(&sender); + // This should move all current transactions to future and remove old transactions + self.move_all_to_future(&sender, current_nonce); + // And now lets check if there is some chain of transactions in future + // that should be placed in current. It should also update last_nonces. + self.move_future_txs(sender.clone(), current_nonce, current_nonce); + return; + } + } + fn recalculate_future_for_sender(&mut self, sender: &Address, current_nonce: U256) { + // We need to drain all transactions for current sender from future and reinsert them with updated height + let all_nonces_from_sender = match self.future.by_address.row(&sender) { + Some(row_map) => row_map.keys().cloned().collect::>(), + None => vec![], + }; + for k in all_nonces_from_sender { + let order = self.future.drop(&sender, &k).unwrap(); + self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); + } + } + + fn move_all_to_future(&mut self, sender: &Address, current_nonce: U256) { let all_nonces_from_sender = match self.current.by_address.row(&sender) { Some(row_map) => row_map.keys().cloned().collect::>(), None => vec![], @@ -273,14 +297,9 @@ impl TransactionQueue { } } self.future.enforce_limit(&mut self.by_hash); - - // And now lets check if there is some chain of transactions in future - // that should be placed in current - if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce, current_nonce) { - self.last_nonces.insert(sender, new_current_top); - } } + /// Returns top transactions from the queue pub fn top_transactions(&self, size: usize) -> Vec { self.current.by_priority @@ -299,11 +318,11 @@ impl TransactionQueue { self.last_nonces.clear(); } - fn move_future_txs(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) -> Option { + fn move_future_txs(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) { { let by_nonce = self.future.by_address.row_mut(&address); if let None = by_nonce { - return None; + return; } let mut by_nonce = by_nonce.unwrap(); while let Some(order) = by_nonce.remove(¤t_nonce) { @@ -316,8 +335,8 @@ impl TransactionQueue { } } self.future.by_address.clear_if_empty(&address); - // Returns last inserted nonce - Some(current_nonce - U256::one()) + // Update last inserted nonce + self.last_nonces.insert(address, current_nonce - U256::one()); } fn import_tx(&mut self, tx: VerifiedTransaction, fetch_nonce: &T) @@ -353,9 +372,9 @@ impl TransactionQueue { let base_nonce = fetch_nonce(&address); Self::replace_transaction(tx, base_nonce.clone(), &mut self.current, &mut self.by_hash); + self.last_nonces.insert(address.clone(), nonce); // But maybe there are some more items waiting in future? - let new_last_nonce = self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce); - self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); + self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce); self.current.enforce_limit(&mut self.by_hash); } @@ -777,6 +796,7 @@ mod test { fn should_recalculate_height_when_removing_from_future() { // given let previous_nonce = |a: &Address| default_nonce(a) - U256::one(); + let next_nonce = |a: &Address| default_nonce(a) + U256::one(); let mut txq = TransactionQueue::new(); let (tx1, tx2) = new_txs(U256::one()); txq.add(tx1.clone(), &previous_nonce); @@ -784,7 +804,7 @@ mod test { assert_eq!(txq.status().future, 2); // when - txq.remove(&tx1.hash(), &default_nonce); + txq.remove(&tx1.hash(), &next_nonce); // then let stats = txq.status(); From cc3839ae5744ab887c701c9007eda6162cddff2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 5 Mar 2016 16:46:04 +0100 Subject: [PATCH 06/19] Revert "Revert "Transaction Queue integration"" This reverts commit d330f0b7b7fa5db1b5891d7c1e4e61136603fed5. Conflicts: sync/src/transaction_queue.rs --- Cargo.lock | 19 ++++++ ethcore/src/client.rs | 21 +++++-- ethcore/src/service.rs | 2 +- sync/Cargo.toml | 1 + sync/src/chain.rs | 107 ++++++++++++++++++++++++++++------ sync/src/lib.rs | 14 +++-- sync/src/tests/chain.rs | 51 ++++++++-------- sync/src/tests/helpers.rs | 61 ++++++++++++++----- sync/src/transaction_queue.rs | 17 +++--- 9 files changed, 217 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 55ed996ed..510e69b59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -146,6 +146,14 @@ dependencies = [ "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "deque" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "docopt" version = "0.6.78" @@ -285,6 +293,7 @@ dependencies = [ "heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -655,6 +664,16 @@ dependencies = [ "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rayon" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "regex" version = "0.1.54" diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 858185873..852ba6a36 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -138,6 +138,9 @@ pub trait BlockChainClient : Sync + Send { /// Get block total difficulty. fn block_total_difficulty(&self, id: BlockId) -> Option; + /// Get address nonce. + fn nonce(&self, address: &Address) -> U256; + /// Get block hash. fn block_hash(&self, id: BlockId) -> Option; @@ -365,18 +368,14 @@ impl Client where V: Verifier { bad_blocks.insert(header.hash()); continue; } - let closed_block = self.check_and_close_block(&block); if let Err(_) = closed_block { bad_blocks.insert(header.hash()); break; } - - // Insert block - let closed_block = closed_block.unwrap(); - self.chain.write().unwrap().insert_block(&block.bytes, closed_block.block().receipts().clone()); good_blocks.push(header.hash()); + // Are we committing an era? let ancient = if header.number() >= HISTORY { let n = header.number() - HISTORY; let chain = self.chain.read().unwrap(); @@ -386,10 +385,16 @@ impl Client where V: Verifier { }; // Commit results + let closed_block = closed_block.unwrap(); + let receipts = closed_block.block().receipts().clone(); closed_block.drain() .commit(header.number(), &header.hash(), ancient) .expect("State DB commit failed."); + // And update the chain + self.chain.write().unwrap() + .insert_block(&block.bytes, receipts); + self.report.write().unwrap().accrue_block(&block); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } @@ -408,7 +413,7 @@ impl Client where V: Verifier { if !good_blocks.is_empty() && block_queue.queue_info().is_empty() { io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { good: good_blocks, - bad: bad_blocks, + retracted: bad_blocks, })).unwrap(); } } @@ -581,6 +586,10 @@ impl BlockChainClient for Client where V: Verifier { Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) } + fn nonce(&self, address: &Address) -> U256 { + self.state().nonce(address) + } + fn block_hash(&self, id: BlockId) -> Option { let chain = self.chain.read().unwrap(); Self::block_hash(&chain, id) diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 756d02407..a80adb0ba 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -30,7 +30,7 @@ pub enum SyncMessage { /// Hashes of blocks imported to blockchain good: Vec, /// Hashes of blocks not imported to blockchain - bad: Vec, + retracted: Vec, }, /// A block is ready BlockVerified, diff --git a/sync/Cargo.toml b/sync/Cargo.toml index f10a772e3..0097cd47e 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -17,6 +17,7 @@ time = "0.1.34" rand = "0.3.13" heapsize = "0.3" rustc-serialize = "0.3" +rayon = "0.3.1" [features] default = [] diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 530cfa424..ddf30854a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -30,14 +30,17 @@ /// use util::*; +use rayon::prelude::*; use std::mem::{replace}; -use ethcore::views::{HeaderView}; +use ethcore::views::{HeaderView, BlockView}; use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo}; use range_collection::{RangeCollection, ToUsize, FromUsize}; use ethcore::error::*; use ethcore::block::Block; +use ethcore::transaction::SignedTransaction; use io::SyncIo; +use transaction_queue::TransactionQueue; use time; use super::SyncConfig; @@ -209,6 +212,8 @@ pub struct ChainSync { max_download_ahead_blocks: usize, /// Network ID network_id: U256, + /// Transactions Queue + transaction_queue: Mutex, } type RlpResponseResult = Result, PacketDecodeError>; @@ -234,6 +239,7 @@ impl ChainSync { last_send_block_number: 0, max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, + transaction_queue: Mutex::new(TransactionQueue::new()), } } @@ -292,6 +298,7 @@ impl ChainSync { self.starting_block = 0; self.highest_block = None; self.have_common_block = false; + self.transaction_queue.lock().unwrap().clear(); self.starting_block = io.chain().chain_info().best_block_number; self.state = SyncState::NotSynced; } @@ -484,7 +491,7 @@ impl ChainSync { trace!(target: "sync", "New block already queued {:?}", h); }, Ok(_) => { - if self.current_base_block() < header.number { + if self.current_base_block() < header.number { self.last_imported_block = Some(header.number); self.remove_downloaded_blocks(header.number); } @@ -921,8 +928,16 @@ impl ChainSync { } } /// Called when peer sends us new transactions - fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - Ok(()) + fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let chain = io.chain(); + let item_count = r.item_count(); + trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); + let fetch_latest_nonce = |a : &Address| chain.nonce(a); + for i in 0..item_count { + let tx: SignedTransaction = try!(r.val_at(i)); + self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce); + } + Ok(()) } /// Send Status message @@ -1248,6 +1263,37 @@ impl ChainSync { } self.last_send_block_number = chain.best_block_number; } + + /// called when block is imported to chain, updates transactions queue + pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) { + fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { + let block = chain + .block(BlockId::Hash(hash.clone())) + // Client should send message after commit to db and inserting to chain. + .expect("Expected in-chain blocks."); + let block = BlockView::new(&block); + block.transactions() + } + + + let chain = io.chain(); + let good = good.par_iter().map(|h| fetch_transactions(chain, h)); + let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h)); + + good.for_each(|txs| { + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); + transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); + }); + retracted.for_each(|txs| { + // populate sender + for tx in &txs { + let _sender = tx.sender(); + } + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.add_all(txs, |a| chain.nonce(a)); + }); + } } #[cfg(test)] @@ -1388,7 +1434,7 @@ mod tests { #[test] fn finds_lagging_peers() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); let chain_info = client.chain_info(); @@ -1402,7 +1448,7 @@ mod tests { #[test] fn calculates_tree_for_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(15, false); + client.add_blocks(15, EachBlockWith::Uncle); let start = client.block_hash_delta_minus(4); let end = client.block_hash_delta_minus(2); @@ -1419,7 +1465,7 @@ mod tests { #[test] fn sends_new_hashes_to_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let chain_info = client.chain_info(); @@ -1438,7 +1484,7 @@ mod tests { #[test] fn sends_latest_block_to_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let chain_info = client.chain_info(); @@ -1456,7 +1502,7 @@ mod tests { #[test] fn handles_peer_new_block_mallformed() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, false); + client.add_blocks(10, EachBlockWith::Uncle); let block_data = get_dummy_block(11, client.chain_info().best_block_hash); @@ -1474,7 +1520,7 @@ mod tests { #[test] fn handles_peer_new_block() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, false); + client.add_blocks(10, EachBlockWith::Uncle); let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash); @@ -1492,7 +1538,7 @@ mod tests { #[test] fn handles_peer_new_block_empty() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, false); + client.add_blocks(10, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1508,7 +1554,7 @@ mod tests { #[test] fn handles_peer_new_hashes() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, false); + client.add_blocks(10, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1524,7 +1570,7 @@ mod tests { #[test] fn handles_peer_new_hashes_empty() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, false); + client.add_blocks(10, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1542,7 +1588,7 @@ mod tests { #[test] fn hashes_rlp_mutually_acceptable() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let chain_info = client.chain_info(); @@ -1560,7 +1606,7 @@ mod tests { #[test] fn block_rlp_mutually_acceptable() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let chain_info = client.chain_info(); @@ -1573,10 +1619,37 @@ mod tests { assert!(result.is_ok()); } + #[test] + fn should_add_transactions_to_queue() { + // given + let mut client = TestBlockChainClient::new(); + client.add_blocks(98, EachBlockWith::Uncle); + client.add_blocks(1, EachBlockWith::UncleAndTransaction); + client.add_blocks(1, EachBlockWith::Transaction); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + + let good_blocks = vec![client.block_hash_delta_minus(2)]; + let retracted_blocks = vec![client.block_hash_delta_minus(1)]; + + let mut queue = VecDeque::new(); + let io = TestIo::new(&mut client, &mut queue, None); + + // when + sync.chain_new_blocks(&io, &[], &good_blocks); + assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); + assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); + sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks); + + // then + let status = sync.transaction_queue.lock().unwrap().status(); + assert_eq!(status.pending, 1); + assert_eq!(status.future, 0); + } + #[test] fn returns_requested_block_headers() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); @@ -1600,7 +1673,7 @@ mod tests { #[test] fn returns_requested_block_headers_reverse() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 74541660d..d67a09f3b 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -54,6 +54,7 @@ extern crate ethcore; extern crate env_logger; extern crate time; extern crate rand; +extern crate rayon; #[macro_use] extern crate heapsize; @@ -70,8 +71,7 @@ use io::NetSyncIo; mod chain; mod io; mod range_collection; -// TODO [todr] Made public to suppress dead code warnings -pub mod transaction_queue; +mod transaction_queue; #[cfg(test)] mod tests; @@ -153,8 +153,14 @@ impl NetworkProtocolHandler for EthSync { } fn message(&self, io: &NetworkContext, message: &SyncMessage) { - if let SyncMessage::BlockVerified = *message { - self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); + match *message { + SyncMessage::BlockVerified => { + self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); + }, + SyncMessage::NewChainBlocks { ref good, ref retracted } => { + let sync_io = NetSyncIo::new(io, self.chain.deref()); + self.sync.write().unwrap().chain_new_blocks(&sync_io, good, retracted); + } } } } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index b01c894a0..58f50916e 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -24,8 +24,8 @@ use super::helpers::*; fn two_peers() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); + net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); @@ -35,8 +35,8 @@ fn two_peers() { fn status_after_sync() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); + net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); let status = net.peer(0).sync.status(); assert_eq!(status.state, SyncState::Idle); @@ -45,8 +45,8 @@ fn status_after_sync() { #[test] fn takes_few_steps() { let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(100, false); - net.peer_mut(2).chain.add_blocks(100, false); + net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Uncle); let total_steps = net.sync(); assert!(total_steps < 7); } @@ -56,8 +56,9 @@ fn empty_blocks() { ::env_logger::init().ok(); let mut net = TestNet::new(3); for n in 0..200 { - net.peer_mut(1).chain.add_blocks(5, n % 2 == 0); - net.peer_mut(2).chain.add_blocks(5, n % 2 == 0); + let with = if n % 2 == 0 { EachBlockWith::Nothing } else { EachBlockWith::Uncle }; + net.peer_mut(1).chain.add_blocks(5, with.clone()); + net.peer_mut(2).chain.add_blocks(5, with); } net.sync(); assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); @@ -68,14 +69,14 @@ fn empty_blocks() { fn forked() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(0).chain.add_blocks(300, false); - net.peer_mut(1).chain.add_blocks(300, false); - net.peer_mut(2).chain.add_blocks(300, false); - net.peer_mut(0).chain.add_blocks(100, true); //fork - net.peer_mut(1).chain.add_blocks(200, false); - net.peer_mut(2).chain.add_blocks(200, false); - net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2 - net.peer_mut(2).chain.add_blocks(10, true); + net.peer_mut(0).chain.add_blocks(300, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(300, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(300, EachBlockWith::Uncle); + net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing); //fork + net.peer_mut(1).chain.add_blocks(200, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); //fork between 1 and 2 + net.peer_mut(2).chain.add_blocks(10, EachBlockWith::Nothing); // peer 1 has the best chain of 601 blocks let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); net.sync(); @@ -87,8 +88,8 @@ fn forked() { #[test] fn restart() { let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); + net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync_steps(8); @@ -109,8 +110,8 @@ fn status_empty() { #[test] fn status_packet() { let mut net = TestNet::new(2); - net.peer_mut(0).chain.add_blocks(100, false); - net.peer_mut(1).chain.add_blocks(1, false); + net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(1, EachBlockWith::Uncle); net.start(); @@ -123,10 +124,10 @@ fn status_packet() { #[test] fn propagate_hashes() { let mut net = TestNet::new(6); - net.peer_mut(1).chain.add_blocks(10, false); + net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); - net.peer_mut(0).chain.add_blocks(10, false); + net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); net.trigger_block_verified(0); //first event just sets the marker net.trigger_block_verified(0); @@ -149,10 +150,10 @@ fn propagate_hashes() { #[test] fn propagate_blocks() { let mut net = TestNet::new(2); - net.peer_mut(1).chain.add_blocks(10, false); + net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); - net.peer_mut(0).chain.add_blocks(10, false); + net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); net.trigger_block_verified(0); //first event just sets the marker net.trigger_block_verified(0); @@ -164,7 +165,7 @@ fn propagate_blocks() { #[test] fn restart_on_malformed_block() { let mut net = TestNet::new(2); - net.peer_mut(1).chain.add_blocks(10, false); + net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); net.peer_mut(1).chain.corrupt_block(6); net.sync_steps(10); diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index e170a4a85..5b53ad90b 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -22,7 +22,7 @@ use io::SyncIo; use chain::ChainSync; use ::SyncConfig; use ethcore::receipt::Receipt; -use ethcore::transaction::LocalizedTransaction; +use ethcore::transaction::{LocalizedTransaction, Transaction, Action}; use ethcore::filter::Filter; use ethcore::log_entry::LocalizedLogEntry; @@ -34,6 +34,14 @@ pub struct TestBlockChainClient { pub difficulty: RwLock, } +#[derive(Clone)] +pub enum EachBlockWith { + Nothing, + Uncle, + Transaction, + UncleAndTransaction +} + impl TestBlockChainClient { pub fn new() -> TestBlockChainClient { @@ -44,30 +52,53 @@ impl TestBlockChainClient { last_hash: RwLock::new(H256::new()), difficulty: RwLock::new(From::from(0)), }; - client.add_blocks(1, true); // add genesis block + client.add_blocks(1, EachBlockWith::Nothing); // add genesis block client.genesis_hash = client.last_hash.read().unwrap().clone(); client } - pub fn add_blocks(&mut self, count: usize, empty: bool) { + pub fn add_blocks(&mut self, count: usize, with: EachBlockWith) { let len = self.numbers.read().unwrap().len(); for n in len..(len + count) { let mut header = BlockHeader::new(); header.difficulty = From::from(n); header.parent_hash = self.last_hash.read().unwrap().clone(); header.number = n as BlockNumber; - let mut uncles = RlpStream::new_list(if empty {0} else {1}); - if !empty { - let mut uncle_header = BlockHeader::new(); - uncle_header.difficulty = From::from(n); - uncle_header.parent_hash = self.last_hash.read().unwrap().clone(); - uncle_header.number = n as BlockNumber; - uncles.append(&uncle_header); - header.uncles_hash = uncles.as_raw().sha3(); - } + let uncles = match with { + EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => { + let mut uncles = RlpStream::new_list(1); + let mut uncle_header = BlockHeader::new(); + uncle_header.difficulty = From::from(n); + uncle_header.parent_hash = self.last_hash.read().unwrap().clone(); + uncle_header.number = n as BlockNumber; + uncles.append(&uncle_header); + header.uncles_hash = uncles.as_raw().sha3(); + uncles + }, + _ => RlpStream::new_list(0) + }; + let txs = match with { + EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => { + let mut txs = RlpStream::new_list(1); + let keypair = KeyPair::create().unwrap(); + let tx = Transaction { + action: Action::Create, + value: U256::from(100), + data: "3331600055".from_hex().unwrap(), + gas: U256::from(100_000), + gas_price: U256::one(), + nonce: U256::zero() + }; + let signed_tx = tx.sign(&keypair.secret()); + txs.append(&signed_tx); + txs.out() + }, + _ => rlp::NULL_RLP.to_vec() + }; + let mut rlp = RlpStream::new_list(3); rlp.append(&header); - rlp.append_raw(&rlp::NULL_RLP, 1); + rlp.append_raw(&txs, 1); rlp.append_raw(uncles.as_raw(), 1); self.import_block(rlp.as_raw().to_vec()).unwrap(); } @@ -109,6 +140,10 @@ impl BlockChainClient for TestBlockChainClient { unimplemented!(); } + fn nonce(&self, _address: &Address) -> U256 { + U256::zero() + } + fn code(&self, _address: &Address) -> Option { unimplemented!(); } diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 100435530..8b38c64ad 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -221,19 +221,19 @@ impl TransactionQueue { /// Removes all transactions identified by hashes given in slice /// /// If gap is introduced marks subsequent transactions as future - pub fn remove_all(&mut self, txs: &[H256], fetch_nonce: T) + pub fn remove_all(&mut self, transaction_hashes: &[H256], fetch_nonce: T) where T: Fn(&Address) -> U256 { - for tx in txs { - self.remove(&tx, &fetch_nonce); + for hash in transaction_hashes { + self.remove(&hash, &fetch_nonce); } } /// Removes transaction identified by hashes from queue. /// /// If gap is introduced marks subsequent transactions as future - pub fn remove(&mut self, hash: &H256, fetch_nonce: &T) + pub fn remove(&mut self, transaction_hash: &H256, fetch_nonce: &T) where T: Fn(&Address) -> U256 { - let transaction = self.by_hash.remove(hash); + let transaction = self.by_hash.remove(transaction_hash); if transaction.is_none() { // We don't know this transaction return; @@ -244,7 +244,6 @@ impl TransactionQueue { let nonce = transaction.nonce(); let current_nonce = fetch_nonce(&sender); - println!("Removing tx: {:?}", transaction.transaction); // Remove from future let order = self.future.drop(&sender, &nonce); if order.is_some() { @@ -292,7 +291,6 @@ impl TransactionQueue { // Goes to future or is removed let order = self.current.drop(&sender, &k).unwrap(); if k >= current_nonce { - println!("Moving to future: {:?}", order); self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); } else { self.by_hash.remove(&order.hash); @@ -302,7 +300,7 @@ impl TransactionQueue { // And now lets check if there is some chain of transactions in future // that should be placed in current - if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce - U256::one(), current_nonce) { + if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce, current_nonce) { self.last_nonces.insert(sender, new_current_top); } } @@ -337,7 +335,6 @@ impl TransactionQueue { // remove also from priority and hash self.future.by_priority.remove(&order); // Put to current - println!("Moved: {:?}", order); let order = order.update_height(current_nonce.clone(), first_nonce); self.current.insert(address.clone(), current_nonce, order); current_nonce = current_nonce + U256::one(); @@ -366,7 +363,6 @@ impl TransactionQueue { .cloned() .map_or(state_nonce, |n| n + U256::one()); - println!("Expected next: {:?}, got: {:?}", next_nonce, nonce); // Check height if nonce > next_nonce { // We have a gap - put to future @@ -375,6 +371,7 @@ impl TransactionQueue { return; } else if nonce < state_nonce { // Droping transaction + trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce); return; } From 8915974cf0aba8e26f27294cb3510edd600252fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 5 Mar 2016 16:48:03 +0100 Subject: [PATCH 07/19] Fixing compilation --- sync/src/transaction_queue.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 8b38c64ad..24bb772d7 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -297,12 +297,6 @@ impl TransactionQueue { } } self.future.enforce_limit(&mut self.by_hash); - - // And now lets check if there is some chain of transactions in future - // that should be placed in current - if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce, current_nonce) { - self.last_nonces.insert(sender, new_current_top); - } } From c13afcf40485411788b6817cb324a23f9de32d59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 5 Mar 2016 17:06:04 +0100 Subject: [PATCH 08/19] Removing assertion and just comparing fees --- sync/src/transaction_queue.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 24bb772d7..f14d94c8c 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -387,8 +387,9 @@ impl TransactionQueue { by_hash.insert(hash.clone(), tx); if let Some(old) = set.insert(address, nonce, order.clone()) { // There was already transaction in queue. Let's check which one should stay - if old.cmp(&order) == Ordering::Greater { - assert!(old.nonce_height == order.nonce_height, "Both transactions should have the same height."); + let old_fee = old.gas_price; + let new_fee = order.gas_price; + if old_fee.cmp(&new_fee) == Ordering::Greater { // Put back old transaction since it has greater priority (higher gas_price) set.insert(address, nonce, old); by_hash.remove(&hash); From 18cbea394d53abde1780efbb1cc3ea0c8e678ce1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 5 Mar 2016 17:14:48 +0100 Subject: [PATCH 09/19] Small renaming --- sync/src/transaction_queue.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index f14d94c8c..463607cae 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -247,10 +247,10 @@ impl TransactionQueue { // Remove from future let order = self.future.drop(&sender, &nonce); if order.is_some() { - self.recalculate_future_for_sender(&sender, current_nonce); + self.update_future(&sender, current_nonce); // And now lets check if there is some chain of transactions in future // that should be placed in current - self.move_future_txs(sender.clone(), current_nonce, current_nonce); + self.move_matching_future_to_current(sender.clone(), current_nonce, current_nonce); return; } @@ -264,12 +264,12 @@ impl TransactionQueue { self.move_all_to_future(&sender, current_nonce); // And now lets check if there is some chain of transactions in future // that should be placed in current. It should also update last_nonces. - self.move_future_txs(sender.clone(), current_nonce, current_nonce); + self.move_matching_future_to_current(sender.clone(), current_nonce, current_nonce); return; } } - fn recalculate_future_for_sender(&mut self, sender: &Address, current_nonce: U256) { + fn update_future(&mut self, sender: &Address, current_nonce: U256) { // We need to drain all transactions for current sender from future and reinsert them with updated height let all_nonces_from_sender = match self.future.by_address.row(&sender) { Some(row_map) => row_map.keys().cloned().collect::>(), @@ -318,7 +318,7 @@ impl TransactionQueue { self.last_nonces.clear(); } - fn move_future_txs(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) { + fn move_matching_future_to_current(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) { { let by_nonce = self.future.by_address.row_mut(&address); if let None = by_nonce { @@ -374,7 +374,7 @@ impl TransactionQueue { Self::replace_transaction(tx, base_nonce.clone(), &mut self.current, &mut self.by_hash); self.last_nonces.insert(address.clone(), nonce); // But maybe there are some more items waiting in future? - self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce); + self.move_matching_future_to_current(address.clone(), nonce + U256::one(), base_nonce); self.current.enforce_limit(&mut self.by_hash); } From 4a53d62be436513d81209db0d4a88ccc0f3aef06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 5 Mar 2016 17:41:35 +0100 Subject: [PATCH 10/19] Fixing inconsistency when replacing transactions in queue --- sync/src/transaction_queue.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 463607cae..b98772199 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -348,8 +348,8 @@ impl TransactionQueue { return; } - let nonce = tx.nonce(); let address = tx.sender(); + let nonce = tx.nonce(); let state_nonce = fetch_nonce(&address); let next_nonce = self.last_nonces @@ -370,7 +370,6 @@ impl TransactionQueue { } let base_nonce = fetch_nonce(&address); - Self::replace_transaction(tx, base_nonce.clone(), &mut self.current, &mut self.by_hash); self.last_nonces.insert(address.clone(), nonce); // But maybe there are some more items waiting in future? @@ -391,7 +390,9 @@ impl TransactionQueue { let new_fee = order.gas_price; if old_fee.cmp(&new_fee) == Ordering::Greater { // Put back old transaction since it has greater priority (higher gas_price) - set.insert(address, nonce, old); + set.by_address.insert(address, nonce, old); + // and remove new one + set.by_priority.remove(&order); by_hash.remove(&hash); } else { // Make sure we remove old transaction entirely From 57e6e1e1b59188cdf8d378b81c33842d5c5feaf7 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Sat, 5 Mar 2016 20:15:19 +0300 Subject: [PATCH 11/19] [ci ship] redundant lines --- sync/src/transaction_queue.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index b98772199..3e0d931b5 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -813,6 +813,4 @@ mod test { assert_eq!(stats.future, 0); assert_eq!(stats.pending, 1); } - - } From aaf2e0c3fbdc0cd3e125988e96987486d73bf395 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sun, 6 Mar 2016 11:04:13 +0100 Subject: [PATCH 12/19] Locking outside of loop --- sync/src/chain.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index ddf30854a..a8bcb653f 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -933,9 +933,11 @@ impl ChainSync { let item_count = r.item_count(); trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); let fetch_latest_nonce = |a : &Address| chain.nonce(a); + + let mut transaction_queue = self.transaction_queue.lock().unwrap(); for i in 0..item_count { let tx: SignedTransaction = try!(r.val_at(i)); - self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce); + transaction_queue.add(tx, &fetch_latest_nonce); } Ok(()) } From e91de785281d59b591079c16c798d7252991b593 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sun, 6 Mar 2016 11:11:59 +0100 Subject: [PATCH 13/19] Renaming back bad as retracted --- ethcore/src/client.rs | 4 +++- ethcore/src/service.rs | 2 ++ sync/src/chain.rs | 10 +++++----- sync/src/lib.rs | 4 ++-- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 852ba6a36..123847a7f 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -413,7 +413,9 @@ impl Client where V: Verifier { if !good_blocks.is_empty() && block_queue.queue_info().is_empty() { io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { good: good_blocks, - retracted: bad_blocks, + bad: bad_blocks, + // TODO [todr] were to take those from? + retracted: vec![], })).unwrap(); } } diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index a80adb0ba..443d09e3b 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -30,6 +30,8 @@ pub enum SyncMessage { /// Hashes of blocks imported to blockchain good: Vec, /// Hashes of blocks not imported to blockchain + bad: Vec, + /// Hashes of blocks that were removed from canonical chain retracted: Vec, }, /// A block is ready diff --git a/sync/src/chain.rs b/sync/src/chain.rs index a8bcb653f..fcc9f49c8 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1267,7 +1267,7 @@ impl ChainSync { } /// called when block is imported to chain, updates transactions queue - pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) { + pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) { fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { let block = chain .block(BlockId::Hash(hash.clone())) @@ -1280,14 +1280,14 @@ impl ChainSync { let chain = io.chain(); let good = good.par_iter().map(|h| fetch_transactions(chain, h)); - let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h)); + let bad = bad.par_iter().map(|h| fetch_transactions(chain, h)); good.for_each(|txs| { let mut transaction_queue = self.transaction_queue.lock().unwrap(); let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); }); - retracted.for_each(|txs| { + bad.for_each(|txs| { // populate sender for tx in &txs { let _sender = tx.sender(); @@ -1637,10 +1637,10 @@ mod tests { let io = TestIo::new(&mut client, &mut queue, None); // when - sync.chain_new_blocks(&io, &[], &good_blocks); + sync.chain_new_blocks(&io, &[], &good_blocks, &[]); assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); - sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks); + sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks, &[]); // then let status = sync.transaction_queue.lock().unwrap().status(); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index d67a09f3b..8a30385a2 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -157,9 +157,9 @@ impl NetworkProtocolHandler for EthSync { SyncMessage::BlockVerified => { self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); }, - SyncMessage::NewChainBlocks { ref good, ref retracted } => { + SyncMessage::NewChainBlocks { ref good, ref bad, ref retracted } => { let sync_io = NetSyncIo::new(io, self.chain.deref()); - self.sync.write().unwrap().chain_new_blocks(&sync_io, good, retracted); + self.sync.write().unwrap().chain_new_blocks(&sync_io, good, bad, retracted); } } } From e83f8561041216a56a47cba39abd9ed3a0385961 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 7 Mar 2016 12:16:37 +0100 Subject: [PATCH 14/19] Merging chain_blocks_verified to chain_new_blocks --- sync/src/chain.rs | 78 +++++++++++++++++++++------------------ sync/src/lib.rs | 10 ++--- sync/src/tests/chain.rs | 8 ++-- sync/src/tests/helpers.rs | 4 +- 4 files changed, 52 insertions(+), 48 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index fc0a19aba..e598c4572 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -207,7 +207,7 @@ pub struct ChainSync { /// True if common block for our and remote chain has been found have_common_block: bool, /// Last propagated block number - last_send_block_number: BlockNumber, + last_sent_block_number: BlockNumber, /// Max blocks to download ahead max_download_ahead_blocks: usize, /// Network ID @@ -236,7 +236,7 @@ impl ChainSync { last_imported_hash: None, syncing_difficulty: U256::from(0u64), have_common_block: false, - last_send_block_number: 0, + last_sent_block_number: 0, max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, transaction_queue: Mutex::new(TransactionQueue::new()), @@ -1248,26 +1248,25 @@ impl ChainSync { sent } + fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { + let chain_info = io.chain().chain_info(); + if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { + let blocks = self.propagate_blocks(&chain_info, io); + let hashes = self.propagate_new_hashes(&chain_info, io); + if blocks != 0 || hashes != 0 { + trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); + } + } + self.last_sent_block_number = chain_info.best_block_number; + } + /// Maintain other peers. Send out any new blocks and transactions pub fn maintain_sync(&mut self, io: &mut SyncIo) { self.check_resume(io); } - /// should be called once chain has new block, triggers the latest block propagation - pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) { - let chain = io.chain().chain_info(); - if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { - let blocks = self.propagate_blocks(&chain, io); - let hashes = self.propagate_new_hashes(&chain, io); - if blocks != 0 || hashes != 0 { - trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); - } - } - self.last_send_block_number = chain.best_block_number; - } - - /// called when block is imported to chain, updates transactions queue - pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) { + /// called when block is imported to chain, updates transactions queue and propagates the blocks + pub fn chain_new_blocks(&mut self, io: &mut SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) { fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { let block = chain .block(BlockId::Hash(hash.clone())) @@ -1278,24 +1277,31 @@ impl ChainSync { } - let chain = io.chain(); - let good = good.par_iter().map(|h| fetch_transactions(chain, h)); - let bad = bad.par_iter().map(|h| fetch_transactions(chain, h)); + { + let chain = io.chain(); + let good = good.par_iter().map(|h| fetch_transactions(chain, h)); + let bad = bad.par_iter().map(|h| fetch_transactions(chain, h)); - good.for_each(|txs| { - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); - transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); - }); - bad.for_each(|txs| { - // populate sender - for tx in &txs { - let _sender = tx.sender(); - } - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - transaction_queue.add_all(txs, |a| chain.nonce(a)); - }); + good.for_each(|txs| { + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); + transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); + }); + bad.for_each(|txs| { + // populate sender + for tx in &txs { + let _sender = tx.sender(); + } + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.add_all(txs, |a| chain.nonce(a)); + }); + } + + // Propagate latests blocks + self.propagate_latest_blocks(io); + // TODO [todr] propagate transactions? } + } #[cfg(test)] @@ -1634,13 +1640,13 @@ mod tests { let retracted_blocks = vec![client.block_hash_delta_minus(1)]; let mut queue = VecDeque::new(); - let io = TestIo::new(&mut client, &mut queue, None); + let mut io = TestIo::new(&mut client, &mut queue, None); // when - sync.chain_new_blocks(&io, &[], &good_blocks, &[]); + sync.chain_new_blocks(&mut io, &[], &good_blocks, &[]); assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); - sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks, &[]); + sync.chain_new_blocks(&mut io, &good_blocks, &retracted_blocks, &[]); // then let status = sync.transaction_queue.lock().unwrap().status(); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 8a30385a2..b5869642c 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -154,13 +154,11 @@ impl NetworkProtocolHandler for EthSync { fn message(&self, io: &NetworkContext, message: &SyncMessage) { match *message { - SyncMessage::BlockVerified => { - self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); - }, SyncMessage::NewChainBlocks { ref good, ref bad, ref retracted } => { - let sync_io = NetSyncIo::new(io, self.chain.deref()); - self.sync.write().unwrap().chain_new_blocks(&sync_io, good, bad, retracted); - } + let mut sync_io = NetSyncIo::new(io, self.chain.deref()); + self.sync.write().unwrap().chain_new_blocks(&mut sync_io, good, bad, retracted); + }, + _ => {/* Ignore other messages */}, } } } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 58f50916e..855aa79a6 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -129,8 +129,8 @@ fn propagate_hashes() { net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); - net.trigger_block_verified(0); //first event just sets the marker - net.trigger_block_verified(0); + net.trigger_chain_new_blocks(0); //first event just sets the marker + net.trigger_chain_new_blocks(0); // 5 peers to sync assert_eq!(5, net.peer(0).queue.len()); @@ -154,8 +154,8 @@ fn propagate_blocks() { net.sync(); net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); - net.trigger_block_verified(0); //first event just sets the marker - net.trigger_block_verified(0); + net.trigger_chain_new_blocks(0); //first event just sets the marker + net.trigger_chain_new_blocks(0); assert!(!net.peer(0).queue.is_empty()); // NEW_BLOCK_PACKET diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 5b53ad90b..d01dba0b2 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -455,8 +455,8 @@ impl TestNet { self.peers.iter().all(|p| p.queue.is_empty()) } - pub fn trigger_block_verified(&mut self, peer_id: usize) { + pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { let mut peer = self.peer_mut(peer_id); - peer.sync.chain_blocks_verified(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); + peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[]); } } From 6ad0ba8fe24bf35c9a74b48a25c6e84dc132429f Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 9 Mar 2016 17:11:15 +0400 Subject: [PATCH 15/19] basic commands --- Cargo.lock | 20 ++++++++++++++++++++ Cargo.toml | 1 + parity/main.rs | 34 ++++++++++++++++++++++++++++++++++ util/src/keys/store.rs | 1 + 4 files changed, 56 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 55ed996ed..65ca8f566 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,6 +15,7 @@ dependencies = [ "fdlimit 0.1.0", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "number_prefix 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rpassword 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -680,6 +681,17 @@ dependencies = [ "librocksdb-sys 0.2.1 (git+https://github.com/arkpar/rust-rocksdb.git)", ] +[[package]] +name = "rpassword" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", + "termios 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rust-crypto" version = "0.2.34" @@ -813,6 +825,14 @@ dependencies = [ "winapi 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "termios" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "time" version = "0.1.34" diff --git a/Cargo.toml b/Cargo.toml index 9b8ec6405..0852a16bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ fdlimit = { path = "util/fdlimit" } daemonize = "0.2" ethcore-devtools = { path = "devtools" } number_prefix = "0.2" +rpassword = "0.1" [features] default = ["rpc"] diff --git a/parity/main.rs b/parity/main.rs index 296e1df65..a442f4fdb 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -32,6 +32,7 @@ extern crate fdlimit; extern crate daemonize; extern crate time; extern crate number_prefix; +extern crate rpassword; #[cfg(feature = "rpc")] extern crate ethcore_rpc as rpc; @@ -70,6 +71,7 @@ Parity. Ethereum Client. Usage: parity daemon [options] [ --no-bootstrap | ... ] + parity account parity [options] [ --no-bootstrap | ... ] Protocol Options: @@ -126,8 +128,10 @@ Miscellaneous Options: #[derive(Debug, RustcDecodable)] struct Args { cmd_daemon: bool, + cmd_account: bool, arg_pid_file: String, arg_enode: Vec, + arg_command: String, flag_chain: String, flag_testnet: bool, flag_datadir: String, @@ -337,9 +341,39 @@ impl Configuration { .start() .unwrap_or_else(|e| die!("Couldn't daemonize; {}", e)); } + if self.args.cmd_account { + self.execute_account_cli(&self.args.arg_command); + return; + } self.execute_client(); } + fn execute_account_cli(&self, command: &str) { + use util::keys::store::SecretStore; + use rpassword::read_password; + let mut secret_store = SecretStore::new(); + if command == "new" { + println!("Please note that password is NOT RECOVERABLE."); + println!("Type password: "); + let password = read_password().unwrap(); + println!("Repeat password: "); + let password_repeat = read_password().unwrap(); + if password != password_repeat { + println!("Passwords do not match!"); + return; + } + println!("New account address:"); + let new_address = secret_store.new_account(&password).unwrap(); + println!("{:?}", new_address); + } + if command == "list" { + println!("Known addresses:"); + for &(addr, _) in secret_store.accounts().unwrap().iter() { + println!("{:?}", addr); + } + } + } + fn execute_client(&self) { // Setup logging setup_log(&self.args.flag_logging); diff --git a/util/src/keys/store.rs b/util/src/keys/store.rs index 625d6fd8f..dcc165259 100644 --- a/util/src/keys/store.rs +++ b/util/src/keys/store.rs @@ -84,6 +84,7 @@ impl SecretStore { let mut path = ::std::env::home_dir().expect("Failed to get home dir"); path.push(".parity"); path.push("keys"); + ::std::fs::create_dir_all(&path).expect("Should panic since it is critical to be able to access home dir"); Self::new_in(&path) } From 7ff4d145448487685be000f572f790c3bcef5ae9 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 9 Mar 2016 19:27:44 +0400 Subject: [PATCH 16/19] adding return to if branch --- parity/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/parity/main.rs b/parity/main.rs index 1a2847439..9a45980ef 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -365,6 +365,7 @@ impl Configuration { println!("New account address:"); let new_address = secret_store.new_account(&password).unwrap(); println!("{:?}", new_address); + return; } if command == "list" { println!("Known addresses:"); From 8a83e27d6a8f2f298e6b0dc44a60f9190e8e6c2a Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 9 Mar 2016 22:55:41 +0400 Subject: [PATCH 17/19] cfg-test for noop verifier --- ethcore/src/verification/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ethcore/src/verification/mod.rs b/ethcore/src/verification/mod.rs index 260121989..fe1f406cc 100644 --- a/ethcore/src/verification/mod.rs +++ b/ethcore/src/verification/mod.rs @@ -17,9 +17,11 @@ pub mod verification; pub mod verifier; mod canon_verifier; +#[cfg(test)] mod noop_verifier; pub use self::verification::*; pub use self::verifier::Verifier; pub use self::canon_verifier::CanonVerifier; +#[cfg(test)] pub use self::noop_verifier::NoopVerifier; From accc1db43fc46e3bd2ab425c778dcfaa843bdec8 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Wed, 9 Mar 2016 23:39:36 +0400 Subject: [PATCH 18/19] chaning docopt config a bit --- parity/main.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/parity/main.rs b/parity/main.rs index 9a45980ef..92400728d 100644 --- a/parity/main.rs +++ b/parity/main.rs @@ -71,7 +71,7 @@ Parity. Ethereum Client. Usage: parity daemon [options] [ --no-bootstrap | ... ] - parity account + parity account (new | list) parity [options] [ --no-bootstrap | ... ] Protocol Options: @@ -129,9 +129,10 @@ Miscellaneous Options: struct Args { cmd_daemon: bool, cmd_account: bool, + cmd_new: bool, + cmd_list: bool, arg_pid_file: String, arg_enode: Vec, - arg_command: String, flag_chain: String, flag_testnet: bool, flag_datadir: String, @@ -342,17 +343,17 @@ impl Configuration { .unwrap_or_else(|e| die!("Couldn't daemonize; {}", e)); } if self.args.cmd_account { - self.execute_account_cli(&self.args.arg_command); + self.execute_account_cli(); return; } self.execute_client(); } - fn execute_account_cli(&self, command: &str) { + fn execute_account_cli(&self) { use util::keys::store::SecretStore; use rpassword::read_password; let mut secret_store = SecretStore::new(); - if command == "new" { + if self.args.cmd_new { println!("Please note that password is NOT RECOVERABLE."); println!("Type password: "); let password = read_password().unwrap(); @@ -367,7 +368,7 @@ impl Configuration { println!("{:?}", new_address); return; } - if command == "list" { + if self.args.cmd_list { println!("Known addresses:"); for &(addr, _) in secret_store.accounts().unwrap().iter() { println!("{:?}", addr); From d7e729a4eaee966d5ef4ea9b1ac57ac32a0714f9 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 9 Mar 2016 23:55:56 +0100 Subject: [PATCH 19/19] Fixed sync handling large forks --- sync/src/chain.rs | 4 ++-- sync/src/range_collection.rs | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index fe1b559cd..14f6d6344 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -843,8 +843,8 @@ impl ChainSync { self.downloading_bodies.remove(&n); self.downloading_headers.remove(&n); } - self.headers.remove_tail(&start); - self.bodies.remove_tail(&start); + self.headers.remove_from(&start); + self.bodies.remove_from(&start); } /// Request headers from a peer by block hash diff --git a/sync/src/range_collection.rs b/sync/src/range_collection.rs index dc2f4e446..9bb5cc522 100644 --- a/sync/src/range_collection.rs +++ b/sync/src/range_collection.rs @@ -42,6 +42,8 @@ pub trait RangeCollection { fn remove_head(&mut self, start: &K); /// Remove all elements >= `start` in the range that contains `start` fn remove_tail(&mut self, start: &K); + /// Remove all elements >= `start` + fn remove_from(&mut self, start: &K); /// Remove all elements >= `tail` fn insert_item(&mut self, key: K, value: V); /// Get an iterator over ranges @@ -137,6 +139,28 @@ impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + } } + /// Remove the element and all following it. + fn remove_from(&mut self, key: &K) { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => { self.drain(.. index + 1); }, + Err(index) =>{ + let mut empty = false; + match self.get_mut(index) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + v.truncate((*key - *k).to_usize()); + empty = v.is_empty(); + } + _ => {} + } + if empty { + self.drain(.. index + 1); + } else { + self.drain(.. index); + } + }, + } + } + /// Remove range elements up to key fn remove_head(&mut self, key: &K) { if *key == FromUsize::from_usize(0) { @@ -272,5 +296,17 @@ fn test_range() { assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); r.remove_tail(&2); assert_eq!(r.range_iter().next(), None); + + let mut r = ranges.clone(); + r.remove_from(&20); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_from(&17); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal); + r.remove_from(&15); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal); + r.remove_from(&3); + assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); + r.remove_from(&2); + assert_eq!(r.range_iter().next(), None); }