From d330f0b7b7fa5db1b5891d7c1e4e61136603fed5 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sat, 5 Mar 2016 12:53:54 +0100 Subject: [PATCH] Revert "Transaction Queue integration" --- Cargo.lock | 19 ------ ethcore/src/client.rs | 21 ++----- ethcore/src/service.rs | 2 +- sync/Cargo.toml | 1 - sync/src/chain.rs | 107 ++++++---------------------------- sync/src/lib.rs | 14 ++--- sync/src/tests/chain.rs | 51 ++++++++-------- sync/src/tests/helpers.rs | 61 +++++-------------- sync/src/transaction_queue.rs | 23 +++++--- 9 files changed, 80 insertions(+), 219 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 510e69b59..55ed996ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -146,14 +146,6 @@ dependencies = [ "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "deque" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "docopt" version = "0.6.78" @@ -293,7 +285,6 @@ dependencies = [ "heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", - "rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -664,16 +655,6 @@ dependencies = [ "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "rayon" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "regex" version = "0.1.54" diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index 852ba6a36..858185873 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -138,9 +138,6 @@ pub trait BlockChainClient : Sync + Send { /// Get block total difficulty. fn block_total_difficulty(&self, id: BlockId) -> Option; - /// Get address nonce. - fn nonce(&self, address: &Address) -> U256; - /// Get block hash. fn block_hash(&self, id: BlockId) -> Option; @@ -368,14 +365,18 @@ impl Client where V: Verifier { bad_blocks.insert(header.hash()); continue; } + let closed_block = self.check_and_close_block(&block); if let Err(_) = closed_block { bad_blocks.insert(header.hash()); break; } + + // Insert block + let closed_block = closed_block.unwrap(); + self.chain.write().unwrap().insert_block(&block.bytes, closed_block.block().receipts().clone()); good_blocks.push(header.hash()); - // Are we committing an era? let ancient = if header.number() >= HISTORY { let n = header.number() - HISTORY; let chain = self.chain.read().unwrap(); @@ -385,16 +386,10 @@ impl Client where V: Verifier { }; // Commit results - let closed_block = closed_block.unwrap(); - let receipts = closed_block.block().receipts().clone(); closed_block.drain() .commit(header.number(), &header.hash(), ancient) .expect("State DB commit failed."); - // And update the chain - self.chain.write().unwrap() - .insert_block(&block.bytes, receipts); - self.report.write().unwrap().accrue_block(&block); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } @@ -413,7 +408,7 @@ impl Client where V: Verifier { if !good_blocks.is_empty() && block_queue.queue_info().is_empty() { io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { good: good_blocks, - retracted: bad_blocks, + bad: bad_blocks, })).unwrap(); } } @@ -586,10 +581,6 @@ impl BlockChainClient for Client where V: Verifier { Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) } - fn nonce(&self, address: &Address) -> U256 { - self.state().nonce(address) - } - fn block_hash(&self, id: BlockId) -> Option { let chain = self.chain.read().unwrap(); Self::block_hash(&chain, id) diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index a80adb0ba..756d02407 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -30,7 +30,7 @@ pub enum SyncMessage { /// Hashes of blocks imported to blockchain good: Vec, /// Hashes of blocks not imported to blockchain - retracted: Vec, + bad: Vec, }, /// A block is ready BlockVerified, diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 0097cd47e..f10a772e3 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -17,7 +17,6 @@ time = "0.1.34" rand = "0.3.13" heapsize = "0.3" rustc-serialize = "0.3" -rayon = "0.3.1" [features] default = [] diff --git a/sync/src/chain.rs b/sync/src/chain.rs index ddf30854a..530cfa424 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -30,17 +30,14 @@ /// use util::*; -use rayon::prelude::*; use std::mem::{replace}; -use ethcore::views::{HeaderView, BlockView}; +use ethcore::views::{HeaderView}; use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo}; use range_collection::{RangeCollection, ToUsize, FromUsize}; use ethcore::error::*; use ethcore::block::Block; -use ethcore::transaction::SignedTransaction; use io::SyncIo; -use transaction_queue::TransactionQueue; use time; use super::SyncConfig; @@ -212,8 +209,6 @@ pub struct ChainSync { max_download_ahead_blocks: usize, /// Network ID network_id: U256, - /// Transactions Queue - transaction_queue: Mutex, } type RlpResponseResult = Result, PacketDecodeError>; @@ -239,7 +234,6 @@ impl ChainSync { last_send_block_number: 0, max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, - transaction_queue: Mutex::new(TransactionQueue::new()), } } @@ -298,7 +292,6 @@ impl ChainSync { self.starting_block = 0; self.highest_block = None; self.have_common_block = false; - self.transaction_queue.lock().unwrap().clear(); self.starting_block = io.chain().chain_info().best_block_number; self.state = SyncState::NotSynced; } @@ -491,7 +484,7 @@ impl ChainSync { trace!(target: "sync", "New block already queued {:?}", h); }, Ok(_) => { - if self.current_base_block() < header.number { + if self.current_base_block() < header.number { self.last_imported_block = Some(header.number); self.remove_downloaded_blocks(header.number); } @@ -928,16 +921,8 @@ impl ChainSync { } } /// Called when peer sends us new transactions - fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - let chain = io.chain(); - let item_count = r.item_count(); - trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); - let fetch_latest_nonce = |a : &Address| chain.nonce(a); - for i in 0..item_count { - let tx: SignedTransaction = try!(r.val_at(i)); - self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce); - } - Ok(()) + fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + Ok(()) } /// Send Status message @@ -1263,37 +1248,6 @@ impl ChainSync { } self.last_send_block_number = chain.best_block_number; } - - /// called when block is imported to chain, updates transactions queue - pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) { - fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { - let block = chain - .block(BlockId::Hash(hash.clone())) - // Client should send message after commit to db and inserting to chain. - .expect("Expected in-chain blocks."); - let block = BlockView::new(&block); - block.transactions() - } - - - let chain = io.chain(); - let good = good.par_iter().map(|h| fetch_transactions(chain, h)); - let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h)); - - good.for_each(|txs| { - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); - transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); - }); - retracted.for_each(|txs| { - // populate sender - for tx in &txs { - let _sender = tx.sender(); - } - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - transaction_queue.add_all(txs, |a| chain.nonce(a)); - }); - } } #[cfg(test)] @@ -1434,7 +1388,7 @@ mod tests { #[test] fn finds_lagging_peers() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, EachBlockWith::Uncle); + client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); let chain_info = client.chain_info(); @@ -1448,7 +1402,7 @@ mod tests { #[test] fn calculates_tree_for_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(15, EachBlockWith::Uncle); + client.add_blocks(15, false); let start = client.block_hash_delta_minus(4); let end = client.block_hash_delta_minus(2); @@ -1465,7 +1419,7 @@ mod tests { #[test] fn sends_new_hashes_to_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, EachBlockWith::Uncle); + client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let chain_info = client.chain_info(); @@ -1484,7 +1438,7 @@ mod tests { #[test] fn sends_latest_block_to_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, EachBlockWith::Uncle); + client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let chain_info = client.chain_info(); @@ -1502,7 +1456,7 @@ mod tests { #[test] fn handles_peer_new_block_mallformed() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, EachBlockWith::Uncle); + client.add_blocks(10, false); let block_data = get_dummy_block(11, client.chain_info().best_block_hash); @@ -1520,7 +1474,7 @@ mod tests { #[test] fn handles_peer_new_block() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, EachBlockWith::Uncle); + client.add_blocks(10, false); let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash); @@ -1538,7 +1492,7 @@ mod tests { #[test] fn handles_peer_new_block_empty() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, EachBlockWith::Uncle); + client.add_blocks(10, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1554,7 +1508,7 @@ mod tests { #[test] fn handles_peer_new_hashes() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, EachBlockWith::Uncle); + client.add_blocks(10, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1570,7 +1524,7 @@ mod tests { #[test] fn handles_peer_new_hashes_empty() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, EachBlockWith::Uncle); + client.add_blocks(10, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1588,7 +1542,7 @@ mod tests { #[test] fn hashes_rlp_mutually_acceptable() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, EachBlockWith::Uncle); + client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let chain_info = client.chain_info(); @@ -1606,7 +1560,7 @@ mod tests { #[test] fn block_rlp_mutually_acceptable() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, EachBlockWith::Uncle); + client.add_blocks(100, false); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let chain_info = client.chain_info(); @@ -1619,37 +1573,10 @@ mod tests { assert!(result.is_ok()); } - #[test] - fn should_add_transactions_to_queue() { - // given - let mut client = TestBlockChainClient::new(); - client.add_blocks(98, EachBlockWith::Uncle); - client.add_blocks(1, EachBlockWith::UncleAndTransaction); - client.add_blocks(1, EachBlockWith::Transaction); - let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); - - let good_blocks = vec![client.block_hash_delta_minus(2)]; - let retracted_blocks = vec![client.block_hash_delta_minus(1)]; - - let mut queue = VecDeque::new(); - let io = TestIo::new(&mut client, &mut queue, None); - - // when - sync.chain_new_blocks(&io, &[], &good_blocks); - assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); - assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); - sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks); - - // then - let status = sync.transaction_queue.lock().unwrap().status(); - assert_eq!(status.pending, 1); - assert_eq!(status.future, 0); - } - #[test] fn returns_requested_block_headers() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, EachBlockWith::Uncle); + client.add_blocks(100, false); let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); @@ -1673,7 +1600,7 @@ mod tests { #[test] fn returns_requested_block_headers_reverse() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, EachBlockWith::Uncle); + client.add_blocks(100, false); let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index d67a09f3b..74541660d 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -54,7 +54,6 @@ extern crate ethcore; extern crate env_logger; extern crate time; extern crate rand; -extern crate rayon; #[macro_use] extern crate heapsize; @@ -71,7 +70,8 @@ use io::NetSyncIo; mod chain; mod io; mod range_collection; -mod transaction_queue; +// TODO [todr] Made public to suppress dead code warnings +pub mod transaction_queue; #[cfg(test)] mod tests; @@ -153,14 +153,8 @@ impl NetworkProtocolHandler for EthSync { } fn message(&self, io: &NetworkContext, message: &SyncMessage) { - match *message { - SyncMessage::BlockVerified => { - self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); - }, - SyncMessage::NewChainBlocks { ref good, ref retracted } => { - let sync_io = NetSyncIo::new(io, self.chain.deref()); - self.sync.write().unwrap().chain_new_blocks(&sync_io, good, retracted); - } + if let SyncMessage::BlockVerified = *message { + self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); } } } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 58f50916e..b01c894a0 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -24,8 +24,8 @@ use super::helpers::*; fn two_peers() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); - net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(1000, false); + net.peer_mut(2).chain.add_blocks(1000, false); net.sync(); assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); @@ -35,8 +35,8 @@ fn two_peers() { fn status_after_sync() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); - net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(1000, false); + net.peer_mut(2).chain.add_blocks(1000, false); net.sync(); let status = net.peer(0).sync.status(); assert_eq!(status.state, SyncState::Idle); @@ -45,8 +45,8 @@ fn status_after_sync() { #[test] fn takes_few_steps() { let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); - net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(100, false); + net.peer_mut(2).chain.add_blocks(100, false); let total_steps = net.sync(); assert!(total_steps < 7); } @@ -56,9 +56,8 @@ fn empty_blocks() { ::env_logger::init().ok(); let mut net = TestNet::new(3); for n in 0..200 { - let with = if n % 2 == 0 { EachBlockWith::Nothing } else { EachBlockWith::Uncle }; - net.peer_mut(1).chain.add_blocks(5, with.clone()); - net.peer_mut(2).chain.add_blocks(5, with); + net.peer_mut(1).chain.add_blocks(5, n % 2 == 0); + net.peer_mut(2).chain.add_blocks(5, n % 2 == 0); } net.sync(); assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); @@ -69,14 +68,14 @@ fn empty_blocks() { fn forked() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(0).chain.add_blocks(300, EachBlockWith::Uncle); - net.peer_mut(1).chain.add_blocks(300, EachBlockWith::Uncle); - net.peer_mut(2).chain.add_blocks(300, EachBlockWith::Uncle); - net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing); //fork - net.peer_mut(1).chain.add_blocks(200, EachBlockWith::Uncle); - net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle); - net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); //fork between 1 and 2 - net.peer_mut(2).chain.add_blocks(10, EachBlockWith::Nothing); + net.peer_mut(0).chain.add_blocks(300, false); + net.peer_mut(1).chain.add_blocks(300, false); + net.peer_mut(2).chain.add_blocks(300, false); + net.peer_mut(0).chain.add_blocks(100, true); //fork + net.peer_mut(1).chain.add_blocks(200, false); + net.peer_mut(2).chain.add_blocks(200, false); + net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2 + net.peer_mut(2).chain.add_blocks(10, true); // peer 1 has the best chain of 601 blocks let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); net.sync(); @@ -88,8 +87,8 @@ fn forked() { #[test] fn restart() { let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); - net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(1000, false); + net.peer_mut(2).chain.add_blocks(1000, false); net.sync_steps(8); @@ -110,8 +109,8 @@ fn status_empty() { #[test] fn status_packet() { let mut net = TestNet::new(2); - net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle); - net.peer_mut(1).chain.add_blocks(1, EachBlockWith::Uncle); + net.peer_mut(0).chain.add_blocks(100, false); + net.peer_mut(1).chain.add_blocks(1, false); net.start(); @@ -124,10 +123,10 @@ fn status_packet() { #[test] fn propagate_hashes() { let mut net = TestNet::new(6); - net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(10, false); net.sync(); - net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); + net.peer_mut(0).chain.add_blocks(10, false); net.sync(); net.trigger_block_verified(0); //first event just sets the marker net.trigger_block_verified(0); @@ -150,10 +149,10 @@ fn propagate_hashes() { #[test] fn propagate_blocks() { let mut net = TestNet::new(2); - net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(10, false); net.sync(); - net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); + net.peer_mut(0).chain.add_blocks(10, false); net.trigger_block_verified(0); //first event just sets the marker net.trigger_block_verified(0); @@ -165,7 +164,7 @@ fn propagate_blocks() { #[test] fn restart_on_malformed_block() { let mut net = TestNet::new(2); - net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(10, false); net.peer_mut(1).chain.corrupt_block(6); net.sync_steps(10); diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 5b53ad90b..e170a4a85 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -22,7 +22,7 @@ use io::SyncIo; use chain::ChainSync; use ::SyncConfig; use ethcore::receipt::Receipt; -use ethcore::transaction::{LocalizedTransaction, Transaction, Action}; +use ethcore::transaction::LocalizedTransaction; use ethcore::filter::Filter; use ethcore::log_entry::LocalizedLogEntry; @@ -34,14 +34,6 @@ pub struct TestBlockChainClient { pub difficulty: RwLock, } -#[derive(Clone)] -pub enum EachBlockWith { - Nothing, - Uncle, - Transaction, - UncleAndTransaction -} - impl TestBlockChainClient { pub fn new() -> TestBlockChainClient { @@ -52,53 +44,30 @@ impl TestBlockChainClient { last_hash: RwLock::new(H256::new()), difficulty: RwLock::new(From::from(0)), }; - client.add_blocks(1, EachBlockWith::Nothing); // add genesis block + client.add_blocks(1, true); // add genesis block client.genesis_hash = client.last_hash.read().unwrap().clone(); client } - pub fn add_blocks(&mut self, count: usize, with: EachBlockWith) { + pub fn add_blocks(&mut self, count: usize, empty: bool) { let len = self.numbers.read().unwrap().len(); for n in len..(len + count) { let mut header = BlockHeader::new(); header.difficulty = From::from(n); header.parent_hash = self.last_hash.read().unwrap().clone(); header.number = n as BlockNumber; - let uncles = match with { - EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => { - let mut uncles = RlpStream::new_list(1); - let mut uncle_header = BlockHeader::new(); - uncle_header.difficulty = From::from(n); - uncle_header.parent_hash = self.last_hash.read().unwrap().clone(); - uncle_header.number = n as BlockNumber; - uncles.append(&uncle_header); - header.uncles_hash = uncles.as_raw().sha3(); - uncles - }, - _ => RlpStream::new_list(0) - }; - let txs = match with { - EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => { - let mut txs = RlpStream::new_list(1); - let keypair = KeyPair::create().unwrap(); - let tx = Transaction { - action: Action::Create, - value: U256::from(100), - data: "3331600055".from_hex().unwrap(), - gas: U256::from(100_000), - gas_price: U256::one(), - nonce: U256::zero() - }; - let signed_tx = tx.sign(&keypair.secret()); - txs.append(&signed_tx); - txs.out() - }, - _ => rlp::NULL_RLP.to_vec() - }; - + let mut uncles = RlpStream::new_list(if empty {0} else {1}); + if !empty { + let mut uncle_header = BlockHeader::new(); + uncle_header.difficulty = From::from(n); + uncle_header.parent_hash = self.last_hash.read().unwrap().clone(); + uncle_header.number = n as BlockNumber; + uncles.append(&uncle_header); + header.uncles_hash = uncles.as_raw().sha3(); + } let mut rlp = RlpStream::new_list(3); rlp.append(&header); - rlp.append_raw(&txs, 1); + rlp.append_raw(&rlp::NULL_RLP, 1); rlp.append_raw(uncles.as_raw(), 1); self.import_block(rlp.as_raw().to_vec()).unwrap(); } @@ -140,10 +109,6 @@ impl BlockChainClient for TestBlockChainClient { unimplemented!(); } - fn nonce(&self, _address: &Address) -> U256 { - U256::zero() - } - fn code(&self, _address: &Address) -> Option { unimplemented!(); } diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 83665dfda..4f5622a2f 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -219,19 +219,19 @@ impl TransactionQueue { /// Removes all transactions identified by hashes given in slice /// /// If gap is introduced marks subsequent transactions as future - pub fn remove_all(&mut self, transaction_hashes: &[H256], fetch_nonce: T) + pub fn remove_all(&mut self, txs: &[H256], fetch_nonce: T) where T: Fn(&Address) -> U256 { - for hash in transaction_hashes { - self.remove(&hash, &fetch_nonce); + for tx in txs { + self.remove(&tx, &fetch_nonce); } } /// Removes transaction identified by hashes from queue. /// /// If gap is introduced marks subsequent transactions as future - pub fn remove(&mut self, transaction_hash: &H256, fetch_nonce: &T) + pub fn remove(&mut self, hash: &H256, fetch_nonce: &T) where T: Fn(&Address) -> U256 { - let transaction = self.by_hash.remove(transaction_hash); + let transaction = self.by_hash.remove(hash); if transaction.is_none() { // We don't know this transaction return; @@ -240,6 +240,7 @@ impl TransactionQueue { let sender = transaction.sender(); let nonce = transaction.nonce(); + println!("Removing tx: {:?}", transaction.transaction); // Remove from future self.future.drop(&sender, &nonce); @@ -265,6 +266,7 @@ impl TransactionQueue { // Goes to future or is removed let order = self.current.drop(&sender, &k).unwrap(); if k >= current_nonce { + println!("Moving to future: {:?}", order); self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); } else { self.by_hash.remove(&order.hash); @@ -274,7 +276,7 @@ impl TransactionQueue { // And now lets check if there is some chain of transactions in future // that should be placed in current - if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce, current_nonce) { + if let Some(new_current_top) = self.move_future_txs(sender.clone(), current_nonce - U256::one(), current_nonce) { self.last_nonces.insert(sender, new_current_top); } } @@ -297,7 +299,9 @@ impl TransactionQueue { self.last_nonces.clear(); } - fn move_future_txs(&mut self, address: Address, mut current_nonce: U256, first_nonce: U256) -> Option { + fn move_future_txs(&mut self, address: Address, current_nonce: U256, first_nonce: U256) -> Option { + println!("Moving from future for: {:?} base: {:?}", current_nonce, first_nonce); + let mut current_nonce = current_nonce + U256::one(); { let by_nonce = self.future.by_address.row_mut(&address); if let None = by_nonce { @@ -308,6 +312,7 @@ impl TransactionQueue { // remove also from priority and hash self.future.by_priority.remove(&order); // Put to current + println!("Moved: {:?}", order); let order = order.update_height(current_nonce.clone(), first_nonce); self.current.insert(address.clone(), current_nonce, order); current_nonce = current_nonce + U256::one(); @@ -328,6 +333,7 @@ impl TransactionQueue { .cloned() .map_or_else(|| fetch_nonce(&address), |n| n + U256::one()); + println!("Expected next: {:?}, got: {:?}", next_nonce, nonce); // Check height if nonce > next_nonce { let order = TransactionOrder::for_transaction(&tx, next_nonce); @@ -339,7 +345,6 @@ impl TransactionQueue { return; } else if next_nonce > nonce { // Droping transaction - trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce); return; } @@ -351,7 +356,7 @@ impl TransactionQueue { // Insert to current self.current.insert(address.clone(), nonce, order); // But maybe there are some more items waiting in future? - let new_last_nonce = self.move_future_txs(address.clone(), nonce + U256::one(), base_nonce); + let new_last_nonce = self.move_future_txs(address.clone(), nonce, base_nonce); self.last_nonces.insert(address.clone(), new_last_nonce.unwrap_or(nonce)); // Enforce limit self.current.enforce_limit(&self.by_hash);