Merge pull request #595 from ethcore/tx_queue_integration
Transaction Queue integration
This commit is contained in:
		
						commit
						c4fe307b06
					
				
							
								
								
									
										19
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										19
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -146,6 +146,14 @@ dependencies = [ | |||||||
|  "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", |  "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "deque" | ||||||
|  | version = "0.3.1" | ||||||
|  | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
|  | dependencies = [ | ||||||
|  |  "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", | ||||||
|  | ] | ||||||
|  | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "docopt" | name = "docopt" | ||||||
| version = "0.6.78" | version = "0.6.78" | ||||||
| @ -285,6 +293,7 @@ dependencies = [ | |||||||
|  "heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", |  "heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", | ||||||
|  "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", |  "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", | ||||||
|  "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", |  "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", | ||||||
|  |  "rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", | ||||||
|  "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", |  "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", | ||||||
|  "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", |  "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", | ||||||
| ] | ] | ||||||
| @ -655,6 +664,16 @@ dependencies = [ | |||||||
|  "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", |  "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
|  | [[package]] | ||||||
|  | name = "rayon" | ||||||
|  | version = "0.3.1" | ||||||
|  | source = "registry+https://github.com/rust-lang/crates.io-index" | ||||||
|  | dependencies = [ | ||||||
|  |  "deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", | ||||||
|  |  "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", | ||||||
|  |  "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", | ||||||
|  | ] | ||||||
|  | 
 | ||||||
| [[package]] | [[package]] | ||||||
| name = "regex" | name = "regex" | ||||||
| version = "0.1.54" | version = "0.1.54" | ||||||
|  | |||||||
| @ -138,6 +138,9 @@ pub trait BlockChainClient : Sync + Send { | |||||||
| 	/// Get block total difficulty.
 | 	/// Get block total difficulty.
 | ||||||
| 	fn block_total_difficulty(&self, id: BlockId) -> Option<U256>; | 	fn block_total_difficulty(&self, id: BlockId) -> Option<U256>; | ||||||
| 
 | 
 | ||||||
|  | 	/// Get address nonce.
 | ||||||
|  | 	fn nonce(&self, address: &Address) -> U256; | ||||||
|  | 
 | ||||||
| 	/// Get block hash.
 | 	/// Get block hash.
 | ||||||
| 	fn block_hash(&self, id: BlockId) -> Option<H256>; | 	fn block_hash(&self, id: BlockId) -> Option<H256>; | ||||||
| 
 | 
 | ||||||
| @ -365,18 +368,14 @@ impl<V> Client<V> where V: Verifier { | |||||||
| 				bad_blocks.insert(header.hash()); | 				bad_blocks.insert(header.hash()); | ||||||
| 				continue; | 				continue; | ||||||
| 			} | 			} | ||||||
| 
 |  | ||||||
| 			let closed_block = self.check_and_close_block(&block); | 			let closed_block = self.check_and_close_block(&block); | ||||||
| 			if let Err(_) = closed_block { | 			if let Err(_) = closed_block { | ||||||
| 				bad_blocks.insert(header.hash()); | 				bad_blocks.insert(header.hash()); | ||||||
| 				break; | 				break; | ||||||
| 			} | 			} | ||||||
| 
 |  | ||||||
| 			// Insert block
 |  | ||||||
| 			let closed_block = closed_block.unwrap(); |  | ||||||
| 			self.chain.write().unwrap().insert_block(&block.bytes, closed_block.block().receipts().clone()); |  | ||||||
| 			good_blocks.push(header.hash()); | 			good_blocks.push(header.hash()); | ||||||
| 
 | 
 | ||||||
|  | 			// Are we committing an era?
 | ||||||
| 			let ancient = if header.number() >= HISTORY { | 			let ancient = if header.number() >= HISTORY { | ||||||
| 				let n = header.number() - HISTORY; | 				let n = header.number() - HISTORY; | ||||||
| 				let chain = self.chain.read().unwrap(); | 				let chain = self.chain.read().unwrap(); | ||||||
| @ -386,10 +385,16 @@ impl<V> Client<V> where V: Verifier { | |||||||
| 			}; | 			}; | ||||||
| 
 | 
 | ||||||
| 			// Commit results
 | 			// Commit results
 | ||||||
|  | 			let closed_block = closed_block.unwrap(); | ||||||
|  | 			let receipts = closed_block.block().receipts().clone(); | ||||||
| 			closed_block.drain() | 			closed_block.drain() | ||||||
| 				.commit(header.number(), &header.hash(), ancient) | 				.commit(header.number(), &header.hash(), ancient) | ||||||
| 				.expect("State DB commit failed."); | 				.expect("State DB commit failed."); | ||||||
| 
 | 
 | ||||||
|  | 			// And update the chain
 | ||||||
|  | 			self.chain.write().unwrap() | ||||||
|  | 				.insert_block(&block.bytes, receipts); | ||||||
|  | 
 | ||||||
| 			self.report.write().unwrap().accrue_block(&block); | 			self.report.write().unwrap().accrue_block(&block); | ||||||
| 			trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); | 			trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); | ||||||
| 		} | 		} | ||||||
| @ -408,7 +413,7 @@ impl<V> Client<V> where V: Verifier { | |||||||
| 			if !good_blocks.is_empty() && block_queue.queue_info().is_empty() { | 			if !good_blocks.is_empty() && block_queue.queue_info().is_empty() { | ||||||
| 				io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { | 				io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { | ||||||
| 					good: good_blocks, | 					good: good_blocks, | ||||||
| 					bad: bad_blocks, | 					retracted: bad_blocks, | ||||||
| 				})).unwrap(); | 				})).unwrap(); | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @ -581,6 +586,10 @@ impl<V> BlockChainClient for Client<V> where V: Verifier { | |||||||
| 		Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) | 		Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	fn nonce(&self, address: &Address) -> U256 { | ||||||
|  | 		self.state().nonce(address) | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	fn block_hash(&self, id: BlockId) -> Option<H256> { | 	fn block_hash(&self, id: BlockId) -> Option<H256> { | ||||||
| 		let chain = self.chain.read().unwrap(); | 		let chain = self.chain.read().unwrap(); | ||||||
| 		Self::block_hash(&chain, id) | 		Self::block_hash(&chain, id) | ||||||
|  | |||||||
| @ -30,7 +30,7 @@ pub enum SyncMessage { | |||||||
| 		/// Hashes of blocks imported to blockchain
 | 		/// Hashes of blocks imported to blockchain
 | ||||||
| 		good: Vec<H256>, | 		good: Vec<H256>, | ||||||
| 		/// Hashes of blocks not imported to blockchain
 | 		/// Hashes of blocks not imported to blockchain
 | ||||||
| 		bad: Vec<H256>, | 		retracted: Vec<H256>, | ||||||
| 	}, | 	}, | ||||||
| 	/// A block is ready
 | 	/// A block is ready
 | ||||||
| 	BlockVerified, | 	BlockVerified, | ||||||
|  | |||||||
| @ -17,6 +17,7 @@ time = "0.1.34" | |||||||
| rand = "0.3.13" | rand = "0.3.13" | ||||||
| heapsize = "0.3" | heapsize = "0.3" | ||||||
| rustc-serialize = "0.3" | rustc-serialize = "0.3" | ||||||
|  | rayon = "0.3.1" | ||||||
| 
 | 
 | ||||||
| [features] | [features] | ||||||
| default = [] | default = [] | ||||||
|  | |||||||
| @ -30,14 +30,17 @@ | |||||||
| ///
 | ///
 | ||||||
| 
 | 
 | ||||||
| use util::*; | use util::*; | ||||||
|  | use rayon::prelude::*; | ||||||
| use std::mem::{replace}; | use std::mem::{replace}; | ||||||
| use ethcore::views::{HeaderView}; | use ethcore::views::{HeaderView, BlockView}; | ||||||
| use ethcore::header::{BlockNumber, Header as BlockHeader}; | use ethcore::header::{BlockNumber, Header as BlockHeader}; | ||||||
| use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo}; | use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo}; | ||||||
| use range_collection::{RangeCollection, ToUsize, FromUsize}; | use range_collection::{RangeCollection, ToUsize, FromUsize}; | ||||||
| use ethcore::error::*; | use ethcore::error::*; | ||||||
| use ethcore::block::Block; | use ethcore::block::Block; | ||||||
|  | use ethcore::transaction::SignedTransaction; | ||||||
| use io::SyncIo; | use io::SyncIo; | ||||||
|  | use transaction_queue::TransactionQueue; | ||||||
| use time; | use time; | ||||||
| use super::SyncConfig; | use super::SyncConfig; | ||||||
| 
 | 
 | ||||||
| @ -209,6 +212,8 @@ pub struct ChainSync { | |||||||
| 	max_download_ahead_blocks: usize, | 	max_download_ahead_blocks: usize, | ||||||
| 	/// Network ID
 | 	/// Network ID
 | ||||||
| 	network_id: U256, | 	network_id: U256, | ||||||
|  | 	/// Transactions Queue
 | ||||||
|  | 	transaction_queue: Mutex<TransactionQueue>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>; | type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>; | ||||||
| @ -234,6 +239,7 @@ impl ChainSync { | |||||||
| 			last_send_block_number: 0, | 			last_send_block_number: 0, | ||||||
| 			max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), | 			max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), | ||||||
| 			network_id: config.network_id, | 			network_id: config.network_id, | ||||||
|  | 			transaction_queue: Mutex::new(TransactionQueue::new()), | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -292,6 +298,7 @@ impl ChainSync { | |||||||
| 		self.starting_block = 0; | 		self.starting_block = 0; | ||||||
| 		self.highest_block = None; | 		self.highest_block = None; | ||||||
| 		self.have_common_block = false; | 		self.have_common_block = false; | ||||||
|  | 		self.transaction_queue.lock().unwrap().clear(); | ||||||
| 		self.starting_block = io.chain().chain_info().best_block_number; | 		self.starting_block = io.chain().chain_info().best_block_number; | ||||||
| 		self.state = SyncState::NotSynced; | 		self.state = SyncState::NotSynced; | ||||||
| 	} | 	} | ||||||
| @ -921,7 +928,15 @@ impl ChainSync { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	/// Called when peer sends us new transactions
 | 	/// Called when peer sends us new transactions
 | ||||||
| 	fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { | 	fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { | ||||||
|  | 		let chain = io.chain(); | ||||||
|  | 		let item_count = r.item_count(); | ||||||
|  | 		trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); | ||||||
|  | 		let fetch_latest_nonce = |a : &Address| chain.nonce(a); | ||||||
|  | 		for i in 0..item_count { | ||||||
|  | 			let tx: SignedTransaction = try!(r.val_at(i)); | ||||||
|  | 			self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce); | ||||||
|  | 		} | ||||||
|  		Ok(()) |  		Ok(()) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| @ -1248,6 +1263,37 @@ impl ChainSync { | |||||||
| 		} | 		} | ||||||
| 		self.last_send_block_number = chain.best_block_number; | 		self.last_send_block_number = chain.best_block_number; | ||||||
| 	} | 	} | ||||||
|  | 
 | ||||||
|  | 	/// called when block is imported to chain, updates transactions queue
 | ||||||
|  | 	pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) { | ||||||
|  | 		fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> { | ||||||
|  | 			let block = chain | ||||||
|  | 				.block(BlockId::Hash(hash.clone())) | ||||||
|  | 				// Client should send message after commit to db and inserting to chain.
 | ||||||
|  | 				.expect("Expected in-chain blocks."); | ||||||
|  | 			let block = BlockView::new(&block); | ||||||
|  | 			block.transactions() | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | 		let chain = io.chain(); | ||||||
|  | 		let good = good.par_iter().map(|h| fetch_transactions(chain, h)); | ||||||
|  | 		let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h)); | ||||||
|  | 
 | ||||||
|  | 		good.for_each(|txs| { | ||||||
|  | 			let mut transaction_queue = self.transaction_queue.lock().unwrap(); | ||||||
|  | 			let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>(); | ||||||
|  | 			transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); | ||||||
|  | 		}); | ||||||
|  | 		retracted.for_each(|txs| { | ||||||
|  | 			// populate sender
 | ||||||
|  | 			for tx in &txs { | ||||||
|  | 				let _sender = tx.sender(); | ||||||
|  | 			} | ||||||
|  | 			let mut transaction_queue = self.transaction_queue.lock().unwrap(); | ||||||
|  | 			transaction_queue.add_all(txs, |a| chain.nonce(a)); | ||||||
|  | 		}); | ||||||
|  | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[cfg(test)] | #[cfg(test)] | ||||||
| @ -1388,7 +1434,7 @@ mod tests { | |||||||
| 	#[test] | 	#[test] | ||||||
| 	fn finds_lagging_peers() { | 	fn finds_lagging_peers() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(100, false); | 		client.add_blocks(100, EachBlockWith::Uncle); | ||||||
| 		let mut queue = VecDeque::new(); | 		let mut queue = VecDeque::new(); | ||||||
| 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); | 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); | ||||||
| 		let chain_info = client.chain_info(); | 		let chain_info = client.chain_info(); | ||||||
| @ -1402,7 +1448,7 @@ mod tests { | |||||||
| 	#[test] | 	#[test] | ||||||
| 	fn calculates_tree_for_lagging_peer() { | 	fn calculates_tree_for_lagging_peer() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(15, false); | 		client.add_blocks(15, EachBlockWith::Uncle); | ||||||
| 
 | 
 | ||||||
| 		let start = client.block_hash_delta_minus(4); | 		let start = client.block_hash_delta_minus(4); | ||||||
| 		let end = client.block_hash_delta_minus(2); | 		let end = client.block_hash_delta_minus(2); | ||||||
| @ -1419,7 +1465,7 @@ mod tests { | |||||||
| 	#[test] | 	#[test] | ||||||
| 	fn sends_new_hashes_to_lagging_peer() { | 	fn sends_new_hashes_to_lagging_peer() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(100, false); | 		client.add_blocks(100, EachBlockWith::Uncle); | ||||||
| 		let mut queue = VecDeque::new(); | 		let mut queue = VecDeque::new(); | ||||||
| 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | ||||||
| 		let chain_info = client.chain_info(); | 		let chain_info = client.chain_info(); | ||||||
| @ -1438,7 +1484,7 @@ mod tests { | |||||||
| 	#[test] | 	#[test] | ||||||
| 	fn sends_latest_block_to_lagging_peer() { | 	fn sends_latest_block_to_lagging_peer() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(100, false); | 		client.add_blocks(100, EachBlockWith::Uncle); | ||||||
| 		let mut queue = VecDeque::new(); | 		let mut queue = VecDeque::new(); | ||||||
| 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | ||||||
| 		let chain_info = client.chain_info(); | 		let chain_info = client.chain_info(); | ||||||
| @ -1456,7 +1502,7 @@ mod tests { | |||||||
| 	#[test] | 	#[test] | ||||||
| 	fn handles_peer_new_block_mallformed() { | 	fn handles_peer_new_block_mallformed() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(10, false); | 		client.add_blocks(10, EachBlockWith::Uncle); | ||||||
| 
 | 
 | ||||||
| 		let block_data = get_dummy_block(11, client.chain_info().best_block_hash); | 		let block_data = get_dummy_block(11, client.chain_info().best_block_hash); | ||||||
| 
 | 
 | ||||||
| @ -1474,7 +1520,7 @@ mod tests { | |||||||
| 	#[test] | 	#[test] | ||||||
| 	fn handles_peer_new_block() { | 	fn handles_peer_new_block() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(10, false); | 		client.add_blocks(10, EachBlockWith::Uncle); | ||||||
| 
 | 
 | ||||||
| 		let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash); | 		let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash); | ||||||
| 
 | 
 | ||||||
| @ -1492,7 +1538,7 @@ mod tests { | |||||||
| 	#[test] | 	#[test] | ||||||
| 	fn handles_peer_new_block_empty() { | 	fn handles_peer_new_block_empty() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(10, false); | 		client.add_blocks(10, EachBlockWith::Uncle); | ||||||
| 		let mut queue = VecDeque::new(); | 		let mut queue = VecDeque::new(); | ||||||
| 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | ||||||
| 		let mut io = TestIo::new(&mut client, &mut queue, None); | 		let mut io = TestIo::new(&mut client, &mut queue, None); | ||||||
| @ -1508,7 +1554,7 @@ mod tests { | |||||||
| 	#[test] | 	#[test] | ||||||
| 	fn handles_peer_new_hashes() { | 	fn handles_peer_new_hashes() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(10, false); | 		client.add_blocks(10, EachBlockWith::Uncle); | ||||||
| 		let mut queue = VecDeque::new(); | 		let mut queue = VecDeque::new(); | ||||||
| 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | ||||||
| 		let mut io = TestIo::new(&mut client, &mut queue, None); | 		let mut io = TestIo::new(&mut client, &mut queue, None); | ||||||
| @ -1524,7 +1570,7 @@ mod tests { | |||||||
| 	#[test] | 	#[test] | ||||||
| 	fn handles_peer_new_hashes_empty() { | 	fn handles_peer_new_hashes_empty() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(10, false); | 		client.add_blocks(10, EachBlockWith::Uncle); | ||||||
| 		let mut queue = VecDeque::new(); | 		let mut queue = VecDeque::new(); | ||||||
| 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | ||||||
| 		let mut io = TestIo::new(&mut client, &mut queue, None); | 		let mut io = TestIo::new(&mut client, &mut queue, None); | ||||||
| @ -1542,7 +1588,7 @@ mod tests { | |||||||
| 	#[test] | 	#[test] | ||||||
| 	fn hashes_rlp_mutually_acceptable() { | 	fn hashes_rlp_mutually_acceptable() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(100, false); | 		client.add_blocks(100, EachBlockWith::Uncle); | ||||||
| 		let mut queue = VecDeque::new(); | 		let mut queue = VecDeque::new(); | ||||||
| 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | ||||||
| 		let chain_info = client.chain_info(); | 		let chain_info = client.chain_info(); | ||||||
| @ -1560,7 +1606,7 @@ mod tests { | |||||||
| 	#[test] | 	#[test] | ||||||
| 	fn block_rlp_mutually_acceptable() { | 	fn block_rlp_mutually_acceptable() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(100, false); | 		client.add_blocks(100, EachBlockWith::Uncle); | ||||||
| 		let mut queue = VecDeque::new(); | 		let mut queue = VecDeque::new(); | ||||||
| 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | ||||||
| 		let chain_info = client.chain_info(); | 		let chain_info = client.chain_info(); | ||||||
| @ -1573,10 +1619,37 @@ mod tests { | |||||||
| 		assert!(result.is_ok()); | 		assert!(result.is_ok()); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	#[test] | ||||||
|  | 	fn should_add_transactions_to_queue() { | ||||||
|  | 		// given
 | ||||||
|  | 		let mut client = TestBlockChainClient::new(); | ||||||
|  | 		client.add_blocks(98, EachBlockWith::Uncle); | ||||||
|  | 		client.add_blocks(1, EachBlockWith::UncleAndTransaction); | ||||||
|  | 		client.add_blocks(1, EachBlockWith::Transaction); | ||||||
|  | 		let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); | ||||||
|  | 
 | ||||||
|  | 		let good_blocks = vec![client.block_hash_delta_minus(2)]; | ||||||
|  | 		let retracted_blocks = vec![client.block_hash_delta_minus(1)]; | ||||||
|  | 
 | ||||||
|  | 		let mut queue = VecDeque::new(); | ||||||
|  | 		let io = TestIo::new(&mut client, &mut queue, None); | ||||||
|  | 
 | ||||||
|  | 		// when
 | ||||||
|  | 		sync.chain_new_blocks(&io, &[], &good_blocks); | ||||||
|  | 		assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); | ||||||
|  | 		assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); | ||||||
|  | 		sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks); | ||||||
|  | 
 | ||||||
|  | 		// then
 | ||||||
|  | 		let status = sync.transaction_queue.lock().unwrap().status(); | ||||||
|  | 		assert_eq!(status.pending, 1); | ||||||
|  | 		assert_eq!(status.future, 0); | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	#[test] | 	#[test] | ||||||
| 	fn returns_requested_block_headers() { | 	fn returns_requested_block_headers() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(100, false); | 		client.add_blocks(100, EachBlockWith::Uncle); | ||||||
| 		let mut queue = VecDeque::new(); | 		let mut queue = VecDeque::new(); | ||||||
| 		let io = TestIo::new(&mut client, &mut queue, None); | 		let io = TestIo::new(&mut client, &mut queue, None); | ||||||
| 
 | 
 | ||||||
| @ -1600,7 +1673,7 @@ mod tests { | |||||||
| 	#[test] | 	#[test] | ||||||
| 	fn returns_requested_block_headers_reverse() { | 	fn returns_requested_block_headers_reverse() { | ||||||
| 		let mut client = TestBlockChainClient::new(); | 		let mut client = TestBlockChainClient::new(); | ||||||
| 		client.add_blocks(100, false); | 		client.add_blocks(100, EachBlockWith::Uncle); | ||||||
| 		let mut queue = VecDeque::new(); | 		let mut queue = VecDeque::new(); | ||||||
| 		let io = TestIo::new(&mut client, &mut queue, None); | 		let io = TestIo::new(&mut client, &mut queue, None); | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -54,6 +54,7 @@ extern crate ethcore; | |||||||
| extern crate env_logger; | extern crate env_logger; | ||||||
| extern crate time; | extern crate time; | ||||||
| extern crate rand; | extern crate rand; | ||||||
|  | extern crate rayon; | ||||||
| #[macro_use] | #[macro_use] | ||||||
| extern crate heapsize; | extern crate heapsize; | ||||||
| 
 | 
 | ||||||
| @ -70,8 +71,7 @@ use io::NetSyncIo; | |||||||
| mod chain; | mod chain; | ||||||
| mod io; | mod io; | ||||||
| mod range_collection; | mod range_collection; | ||||||
| // TODO [todr] Made public to suppress dead code warnings
 | mod transaction_queue; | ||||||
| pub mod transaction_queue; |  | ||||||
| 
 | 
 | ||||||
| #[cfg(test)] | #[cfg(test)] | ||||||
| mod tests; | mod tests; | ||||||
| @ -153,8 +153,14 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync { | |||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) { | 	fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) { | ||||||
| 		if let SyncMessage::BlockVerified = *message { | 		match *message { | ||||||
|  | 			SyncMessage::BlockVerified => { | ||||||
| 				self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); | 				self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); | ||||||
|  | 			}, | ||||||
|  | 			SyncMessage::NewChainBlocks { ref good, ref retracted } => { | ||||||
|  | 				let sync_io = NetSyncIo::new(io, self.chain.deref()); | ||||||
|  | 				self.sync.write().unwrap().chain_new_blocks(&sync_io, good, retracted); | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  | |||||||
| @ -24,8 +24,8 @@ use super::helpers::*; | |||||||
| fn two_peers() { | fn two_peers() { | ||||||
| 	::env_logger::init().ok(); | 	::env_logger::init().ok(); | ||||||
| 	let mut net = TestNet::new(3); | 	let mut net = TestNet::new(3); | ||||||
| 	net.peer_mut(1).chain.add_blocks(1000, false); | 	net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); | ||||||
| 	net.peer_mut(2).chain.add_blocks(1000, false); | 	net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); | ||||||
| 	net.sync(); | 	net.sync(); | ||||||
| 	assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); | 	assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); | ||||||
| 	assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); | 	assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); | ||||||
| @ -35,8 +35,8 @@ fn two_peers() { | |||||||
| fn status_after_sync() { | fn status_after_sync() { | ||||||
| 	::env_logger::init().ok(); | 	::env_logger::init().ok(); | ||||||
| 	let mut net = TestNet::new(3); | 	let mut net = TestNet::new(3); | ||||||
| 	net.peer_mut(1).chain.add_blocks(1000, false); | 	net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); | ||||||
| 	net.peer_mut(2).chain.add_blocks(1000, false); | 	net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); | ||||||
| 	net.sync(); | 	net.sync(); | ||||||
| 	let status = net.peer(0).sync.status(); | 	let status = net.peer(0).sync.status(); | ||||||
| 	assert_eq!(status.state, SyncState::Idle); | 	assert_eq!(status.state, SyncState::Idle); | ||||||
| @ -45,8 +45,8 @@ fn status_after_sync() { | |||||||
| #[test] | #[test] | ||||||
| fn takes_few_steps() { | fn takes_few_steps() { | ||||||
| 	let mut net = TestNet::new(3); | 	let mut net = TestNet::new(3); | ||||||
| 	net.peer_mut(1).chain.add_blocks(100, false); | 	net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); | ||||||
| 	net.peer_mut(2).chain.add_blocks(100, false); | 	net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Uncle); | ||||||
| 	let total_steps = net.sync(); | 	let total_steps = net.sync(); | ||||||
| 	assert!(total_steps < 7); | 	assert!(total_steps < 7); | ||||||
| } | } | ||||||
| @ -56,8 +56,9 @@ fn empty_blocks() { | |||||||
| 	::env_logger::init().ok(); | 	::env_logger::init().ok(); | ||||||
| 	let mut net = TestNet::new(3); | 	let mut net = TestNet::new(3); | ||||||
| 	for n in 0..200 { | 	for n in 0..200 { | ||||||
| 		net.peer_mut(1).chain.add_blocks(5, n % 2 == 0); | 		let with = if n % 2 == 0 { EachBlockWith::Nothing } else { EachBlockWith::Uncle }; | ||||||
| 		net.peer_mut(2).chain.add_blocks(5, n % 2 == 0); | 		net.peer_mut(1).chain.add_blocks(5, with.clone()); | ||||||
|  | 		net.peer_mut(2).chain.add_blocks(5, with); | ||||||
| 	} | 	} | ||||||
| 	net.sync(); | 	net.sync(); | ||||||
| 	assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); | 	assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); | ||||||
| @ -68,14 +69,14 @@ fn empty_blocks() { | |||||||
| fn forked() { | fn forked() { | ||||||
| 	::env_logger::init().ok(); | 	::env_logger::init().ok(); | ||||||
| 	let mut net = TestNet::new(3); | 	let mut net = TestNet::new(3); | ||||||
| 	net.peer_mut(0).chain.add_blocks(300, false); | 	net.peer_mut(0).chain.add_blocks(300, EachBlockWith::Uncle); | ||||||
| 	net.peer_mut(1).chain.add_blocks(300, false); | 	net.peer_mut(1).chain.add_blocks(300, EachBlockWith::Uncle); | ||||||
| 	net.peer_mut(2).chain.add_blocks(300, false); | 	net.peer_mut(2).chain.add_blocks(300, EachBlockWith::Uncle); | ||||||
| 	net.peer_mut(0).chain.add_blocks(100, true); //fork
 | 	net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing); //fork
 | ||||||
| 	net.peer_mut(1).chain.add_blocks(200, false); | 	net.peer_mut(1).chain.add_blocks(200, EachBlockWith::Uncle); | ||||||
| 	net.peer_mut(2).chain.add_blocks(200, false); | 	net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle); | ||||||
| 	net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2
 | 	net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); //fork between 1 and 2
 | ||||||
| 	net.peer_mut(2).chain.add_blocks(10, true); | 	net.peer_mut(2).chain.add_blocks(10, EachBlockWith::Nothing); | ||||||
| 	// peer 1 has the best chain of 601 blocks
 | 	// peer 1 has the best chain of 601 blocks
 | ||||||
| 	let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); | 	let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); | ||||||
| 	net.sync(); | 	net.sync(); | ||||||
| @ -87,8 +88,8 @@ fn forked() { | |||||||
| #[test] | #[test] | ||||||
| fn restart() { | fn restart() { | ||||||
| 	let mut net = TestNet::new(3); | 	let mut net = TestNet::new(3); | ||||||
| 	net.peer_mut(1).chain.add_blocks(1000, false); | 	net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); | ||||||
| 	net.peer_mut(2).chain.add_blocks(1000, false); | 	net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); | ||||||
| 
 | 
 | ||||||
| 	net.sync_steps(8); | 	net.sync_steps(8); | ||||||
| 
 | 
 | ||||||
| @ -109,8 +110,8 @@ fn status_empty() { | |||||||
| #[test] | #[test] | ||||||
| fn status_packet() { | fn status_packet() { | ||||||
| 	let mut net = TestNet::new(2); | 	let mut net = TestNet::new(2); | ||||||
| 	net.peer_mut(0).chain.add_blocks(100, false); | 	net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle); | ||||||
| 	net.peer_mut(1).chain.add_blocks(1, false); | 	net.peer_mut(1).chain.add_blocks(1, EachBlockWith::Uncle); | ||||||
| 
 | 
 | ||||||
| 	net.start(); | 	net.start(); | ||||||
| 
 | 
 | ||||||
| @ -123,10 +124,10 @@ fn status_packet() { | |||||||
| #[test] | #[test] | ||||||
| fn propagate_hashes() { | fn propagate_hashes() { | ||||||
| 	let mut net = TestNet::new(6); | 	let mut net = TestNet::new(6); | ||||||
| 	net.peer_mut(1).chain.add_blocks(10, false); | 	net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); | ||||||
| 	net.sync(); | 	net.sync(); | ||||||
| 
 | 
 | ||||||
| 	net.peer_mut(0).chain.add_blocks(10, false); | 	net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); | ||||||
| 	net.sync(); | 	net.sync(); | ||||||
| 	net.trigger_block_verified(0); //first event just sets the marker
 | 	net.trigger_block_verified(0); //first event just sets the marker
 | ||||||
| 	net.trigger_block_verified(0); | 	net.trigger_block_verified(0); | ||||||
| @ -149,10 +150,10 @@ fn propagate_hashes() { | |||||||
| #[test] | #[test] | ||||||
| fn propagate_blocks() { | fn propagate_blocks() { | ||||||
| 	let mut net = TestNet::new(2); | 	let mut net = TestNet::new(2); | ||||||
| 	net.peer_mut(1).chain.add_blocks(10, false); | 	net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); | ||||||
| 	net.sync(); | 	net.sync(); | ||||||
| 
 | 
 | ||||||
| 	net.peer_mut(0).chain.add_blocks(10, false); | 	net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); | ||||||
| 	net.trigger_block_verified(0); //first event just sets the marker
 | 	net.trigger_block_verified(0); //first event just sets the marker
 | ||||||
| 	net.trigger_block_verified(0); | 	net.trigger_block_verified(0); | ||||||
| 
 | 
 | ||||||
| @ -164,7 +165,7 @@ fn propagate_blocks() { | |||||||
| #[test] | #[test] | ||||||
| fn restart_on_malformed_block() { | fn restart_on_malformed_block() { | ||||||
| 	let mut net = TestNet::new(2); | 	let mut net = TestNet::new(2); | ||||||
| 	net.peer_mut(1).chain.add_blocks(10, false); | 	net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); | ||||||
| 	net.peer_mut(1).chain.corrupt_block(6); | 	net.peer_mut(1).chain.corrupt_block(6); | ||||||
| 	net.sync_steps(10); | 	net.sync_steps(10); | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -22,7 +22,7 @@ use io::SyncIo; | |||||||
| use chain::ChainSync; | use chain::ChainSync; | ||||||
| use ::SyncConfig; | use ::SyncConfig; | ||||||
| use ethcore::receipt::Receipt; | use ethcore::receipt::Receipt; | ||||||
| use ethcore::transaction::LocalizedTransaction; | use ethcore::transaction::{LocalizedTransaction, Transaction, Action}; | ||||||
| use ethcore::filter::Filter; | use ethcore::filter::Filter; | ||||||
| use ethcore::log_entry::LocalizedLogEntry; | use ethcore::log_entry::LocalizedLogEntry; | ||||||
| 
 | 
 | ||||||
| @ -34,6 +34,14 @@ pub struct TestBlockChainClient { | |||||||
| 	pub difficulty: RwLock<U256>, | 	pub difficulty: RwLock<U256>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | #[derive(Clone)] | ||||||
|  | pub enum EachBlockWith { | ||||||
|  | 	Nothing, | ||||||
|  | 	Uncle, | ||||||
|  | 	Transaction, | ||||||
|  | 	UncleAndTransaction | ||||||
|  | } | ||||||
|  | 
 | ||||||
| impl TestBlockChainClient { | impl TestBlockChainClient { | ||||||
| 	pub fn new() -> TestBlockChainClient { | 	pub fn new() -> TestBlockChainClient { | ||||||
| 
 | 
 | ||||||
| @ -44,30 +52,53 @@ impl TestBlockChainClient { | |||||||
| 			last_hash: RwLock::new(H256::new()), | 			last_hash: RwLock::new(H256::new()), | ||||||
| 			difficulty: RwLock::new(From::from(0)), | 			difficulty: RwLock::new(From::from(0)), | ||||||
| 		}; | 		}; | ||||||
| 		client.add_blocks(1, true); // add genesis block
 | 		client.add_blocks(1, EachBlockWith::Nothing); // add genesis block
 | ||||||
| 		client.genesis_hash = client.last_hash.read().unwrap().clone(); | 		client.genesis_hash = client.last_hash.read().unwrap().clone(); | ||||||
| 		client | 		client | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	pub fn add_blocks(&mut self, count: usize, empty: bool) { | 	pub fn add_blocks(&mut self, count: usize, with: EachBlockWith) { | ||||||
| 		let len = self.numbers.read().unwrap().len(); | 		let len = self.numbers.read().unwrap().len(); | ||||||
| 		for n in len..(len + count) { | 		for n in len..(len + count) { | ||||||
| 			let mut header = BlockHeader::new(); | 			let mut header = BlockHeader::new(); | ||||||
| 			header.difficulty = From::from(n); | 			header.difficulty = From::from(n); | ||||||
| 			header.parent_hash = self.last_hash.read().unwrap().clone(); | 			header.parent_hash = self.last_hash.read().unwrap().clone(); | ||||||
| 			header.number = n as BlockNumber; | 			header.number = n as BlockNumber; | ||||||
| 			let mut uncles = RlpStream::new_list(if empty {0} else {1}); | 			let uncles = match with { | ||||||
| 			if !empty { | 				EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => { | ||||||
|  | 					let mut uncles = RlpStream::new_list(1); | ||||||
| 					let mut uncle_header = BlockHeader::new(); | 					let mut uncle_header = BlockHeader::new(); | ||||||
| 					uncle_header.difficulty = From::from(n); | 					uncle_header.difficulty = From::from(n); | ||||||
| 					uncle_header.parent_hash = self.last_hash.read().unwrap().clone(); | 					uncle_header.parent_hash = self.last_hash.read().unwrap().clone(); | ||||||
| 					uncle_header.number = n as BlockNumber; | 					uncle_header.number = n as BlockNumber; | ||||||
| 					uncles.append(&uncle_header); | 					uncles.append(&uncle_header); | ||||||
| 					header.uncles_hash = uncles.as_raw().sha3(); | 					header.uncles_hash = uncles.as_raw().sha3(); | ||||||
| 			} | 					uncles | ||||||
|  | 				}, | ||||||
|  | 				_ => RlpStream::new_list(0) | ||||||
|  | 			}; | ||||||
|  | 			let txs = match with { | ||||||
|  | 				EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => { | ||||||
|  | 					let mut txs = RlpStream::new_list(1); | ||||||
|  | 					let keypair = KeyPair::create().unwrap(); | ||||||
|  | 					let tx = Transaction { | ||||||
|  | 						action: Action::Create, | ||||||
|  | 						value: U256::from(100), | ||||||
|  | 						data: "3331600055".from_hex().unwrap(), | ||||||
|  | 						gas: U256::from(100_000), | ||||||
|  | 						gas_price: U256::one(), | ||||||
|  | 						nonce: U256::zero() | ||||||
|  | 					}; | ||||||
|  | 					let signed_tx = tx.sign(&keypair.secret()); | ||||||
|  | 					txs.append(&signed_tx); | ||||||
|  | 					txs.out() | ||||||
|  | 				}, | ||||||
|  | 				_ => rlp::NULL_RLP.to_vec() | ||||||
|  | 			}; | ||||||
|  | 
 | ||||||
| 			let mut rlp = RlpStream::new_list(3); | 			let mut rlp = RlpStream::new_list(3); | ||||||
| 			rlp.append(&header); | 			rlp.append(&header); | ||||||
| 			rlp.append_raw(&rlp::NULL_RLP, 1); | 			rlp.append_raw(&txs, 1); | ||||||
| 			rlp.append_raw(uncles.as_raw(), 1); | 			rlp.append_raw(uncles.as_raw(), 1); | ||||||
| 			self.import_block(rlp.as_raw().to_vec()).unwrap(); | 			self.import_block(rlp.as_raw().to_vec()).unwrap(); | ||||||
| 		} | 		} | ||||||
| @ -109,6 +140,10 @@ impl BlockChainClient for TestBlockChainClient { | |||||||
| 		unimplemented!(); | 		unimplemented!(); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | 	fn nonce(&self, _address: &Address) -> U256 { | ||||||
|  | 		U256::zero() | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	fn code(&self, _address: &Address) -> Option<Bytes> { | 	fn code(&self, _address: &Address) -> Option<Bytes> { | ||||||
| 		unimplemented!(); | 		unimplemented!(); | ||||||
| 	} | 	} | ||||||
|  | |||||||
| @ -219,19 +219,19 @@ impl TransactionQueue { | |||||||
| 	/// Removes all transactions identified by hashes given in slice
 | 	/// Removes all transactions identified by hashes given in slice
 | ||||||
| 	///
 | 	///
 | ||||||
| 	/// If gap is introduced marks subsequent transactions as future
 | 	/// If gap is introduced marks subsequent transactions as future
 | ||||||
| 	pub fn remove_all<T>(&mut self, txs: &[H256], fetch_nonce: T) | 	pub fn remove_all<T>(&mut self, transaction_hashes: &[H256], fetch_nonce: T) | ||||||
| 		where T: Fn(&Address) -> U256 { | 		where T: Fn(&Address) -> U256 { | ||||||
| 		for tx in txs { | 		for hash in transaction_hashes { | ||||||
| 			self.remove(&tx, &fetch_nonce); | 			self.remove(&hash, &fetch_nonce); | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/// Removes transaction identified by hashes from queue.
 | 	/// Removes transaction identified by hashes from queue.
 | ||||||
| 	///
 | 	///
 | ||||||
| 	/// If gap is introduced marks subsequent transactions as future
 | 	/// If gap is introduced marks subsequent transactions as future
 | ||||||
| 	pub fn remove<T>(&mut self, hash: &H256, fetch_nonce: &T) | 	pub fn remove<T>(&mut self, transaction_hash: &H256, fetch_nonce: &T) | ||||||
| 		where T: Fn(&Address) -> U256 { | 		where T: Fn(&Address) -> U256 { | ||||||
| 		let transaction = self.by_hash.remove(hash); | 		let transaction = self.by_hash.remove(transaction_hash); | ||||||
| 		if transaction.is_none() { | 		if transaction.is_none() { | ||||||
| 			// We don't know this transaction
 | 			// We don't know this transaction
 | ||||||
| 			return; | 			return; | ||||||
| @ -240,7 +240,6 @@ impl TransactionQueue { | |||||||
| 		let sender = transaction.sender(); | 		let sender = transaction.sender(); | ||||||
| 		let nonce = transaction.nonce(); | 		let nonce = transaction.nonce(); | ||||||
| 
 | 
 | ||||||
| 		println!("Removing tx: {:?}", transaction.transaction); |  | ||||||
| 		// Remove from future
 | 		// Remove from future
 | ||||||
| 		self.future.drop(&sender, &nonce); | 		self.future.drop(&sender, &nonce); | ||||||
| 
 | 
 | ||||||
| @ -266,7 +265,6 @@ impl TransactionQueue { | |||||||
| 			// Goes to future or is removed
 | 			// Goes to future or is removed
 | ||||||
| 			let order = self.current.drop(&sender, &k).unwrap(); | 			let order = self.current.drop(&sender, &k).unwrap(); | ||||||
| 			if k >= current_nonce { | 			if k >= current_nonce { | ||||||
| 				println!("Moving to future: {:?}", order); |  | ||||||
| 				self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); | 				self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); | ||||||
| 			} else { | 			} else { | ||||||
| 				self.by_hash.remove(&order.hash); | 				self.by_hash.remove(&order.hash); | ||||||
| @ -276,7 +274,7 @@ impl TransactionQueue { | |||||||
| 
 | 
 | ||||||
| 		// And now lets check if there is some chain of transactions in future
 | 		// And now lets check if there is some chain of transactions in future
 | ||||||
| 		// that should be placed in current
 | 		// that should be placed in current
 | ||||||
| 		if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce - U256::one(), current_nonce) { | 		if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce, current_nonce) { | ||||||
| 			self.last_nonces.insert(sender, new_current_top); | 			self.last_nonces.insert(sender, new_current_top); | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| @ -299,9 +297,7 @@ impl TransactionQueue { | |||||||
| 		self.last_nonces.clear(); | 		self.last_nonces.clear(); | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option<U256> { | 	fn move_future_txs(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) -> Option<U256> { | ||||||
| 		println!("Moving from future for: {:?} base: {:?}", current_nonce, first_nonce); |  | ||||||
| 		let mut current_nonce = current_nonce + U256::one(); |  | ||||||
| 		{ | 		{ | ||||||
| 			let by_nonce = self.future.by_address.row_mut(&address); | 			let by_nonce = self.future.by_address.row_mut(&address); | ||||||
| 			if let None = by_nonce { | 			if let None = by_nonce { | ||||||
| @ -312,7 +308,6 @@ impl TransactionQueue { | |||||||
| 				// remove also from priority and hash
 | 				// remove also from priority and hash
 | ||||||
| 				self.future.by_priority.remove(&order); | 				self.future.by_priority.remove(&order); | ||||||
| 				// Put to current
 | 				// Put to current
 | ||||||
| 				println!("Moved: {:?}", order); |  | ||||||
| 				let order = order.update_height(current_nonce.clone(), first_nonce); | 				let order = order.update_height(current_nonce.clone(), first_nonce); | ||||||
| 				self.current.insert(address.clone(), current_nonce, order); | 				self.current.insert(address.clone(), current_nonce, order); | ||||||
| 				current_nonce = current_nonce + U256::one(); | 				current_nonce = current_nonce + U256::one(); | ||||||
| @ -333,7 +328,6 @@ impl TransactionQueue { | |||||||
| 			.cloned() | 			.cloned() | ||||||
| 			.map_or_else(|| fetch_nonce(&address), |n| n + U256::one()); | 			.map_or_else(|| fetch_nonce(&address), |n| n + U256::one()); | ||||||
| 
 | 
 | ||||||
| 		println!("Expected next: {:?}, got: {:?}", next_nonce, nonce); |  | ||||||
| 		// Check height
 | 		// Check height
 | ||||||
| 		if nonce > next_nonce { | 		if nonce > next_nonce { | ||||||
| 			let order = TransactionOrder::for_transaction(&tx, next_nonce); | 			let order = TransactionOrder::for_transaction(&tx, next_nonce); | ||||||
| @ -345,6 +339,7 @@ impl TransactionQueue { | |||||||
| 			return; | 			return; | ||||||
| 		} else if next_nonce > nonce { | 		} else if next_nonce > nonce { | ||||||
| 			// Droping transaction
 | 			// Droping transaction
 | ||||||
|  | 			trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce); | ||||||
| 			return; | 			return; | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| @ -356,7 +351,7 @@ impl TransactionQueue { | |||||||
| 		// Insert to current
 | 		// Insert to current
 | ||||||
| 		self.current.insert(address.clone(), nonce, order); | 		self.current.insert(address.clone(), nonce, order); | ||||||
| 		// But maybe there are some more items waiting in future?
 | 		// But maybe there are some more items waiting in future?
 | ||||||
| 		let new_last_nonce = self.move_future_txs(address.clone(), nonce, base_nonce); | 		let new_last_nonce = self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce); | ||||||
| 		self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); | 		self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); | ||||||
| 		// Enforce limit
 | 		// Enforce limit
 | ||||||
| 		self.current.enforce_limit(&self.by_hash); | 		self.current.enforce_limit(&self.by_hash); | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user