From 7565625ce042cfc8ce328f9387d8732da48cc4bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 1 Mar 2016 22:30:23 +0100 Subject: [PATCH 1/5] Integrating TransactionQueue with client --- Cargo.lock | 19 +++++++++ ethcore/src/client.rs | 7 +++ sync/Cargo.toml | 1 + sync/src/chain.rs | 89 ++++++++++++++++++++++++++++++++++----- sync/src/lib.rs | 14 ++++-- sync/src/tests/helpers.rs | 4 ++ 6 files changed, 120 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e558606eb..845c493fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -125,6 +125,14 @@ dependencies = [ "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "deque" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "docopt" version = "0.6.78" @@ -259,6 +267,7 @@ dependencies = [ "heapsize 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.34 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -629,6 +638,16 @@ dependencies = [ "libc 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rayon" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "deque 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "regex" version = "0.1.54" diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index f2894decb..611561c50 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -123,6 +123,9 @@ pub trait BlockChainClient : Sync + Send { /// Get block total difficulty. fn block_total_difficulty(&self, id: BlockId) -> Option; + /// Get address nonce. + fn nonce(&self, address: &Address) -> U256; + /// Get address code. fn code(&self, address: &Address) -> Option; @@ -445,6 +448,10 @@ impl BlockChainClient for Client { Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty) } + fn nonce(&self, address: &Address) -> U256 { + self.state().nonce(address) + } + fn code(&self, address: &Address) -> Option { self.state().code(address) } diff --git a/sync/Cargo.toml b/sync/Cargo.toml index 2ce65ca77..993f07a65 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -17,6 +17,7 @@ time = "0.1.34" rand = "0.3.13" heapsize = "0.3" rustc-serialize = "0.3" +rayon = "0.3.1" [features] default = [] diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 6a7add27f..fa033813e 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -30,14 +30,17 @@ /// use util::*; +use rayon::prelude::*; use std::mem::{replace}; -use ethcore::views::{HeaderView}; +use ethcore::views::{HeaderView, BlockView}; use ethcore::header::{BlockNumber, Header as BlockHeader}; use ethcore::client::{BlockChainClient, BlockStatus, BlockId}; use range_collection::{RangeCollection, ToUsize, FromUsize}; use ethcore::error::*; use ethcore::block::Block; +use ethcore::transaction::SignedTransaction; use io::SyncIo; +use transaction_queue::TransactionQueue; use time; use super::SyncConfig; @@ -209,6 +212,8 @@ pub struct ChainSync { max_download_ahead_blocks: usize, /// Network ID network_id: U256, + /// Transactions Queue + transaction_queue: Mutex, } type RlpResponseResult = Result, PacketDecodeError>; @@ -234,6 +239,7 @@ impl ChainSync { last_send_block_number: 0, max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, + transaction_queue: Mutex::new(TransactionQueue::new()), } } @@ -249,14 +255,14 @@ impl ChainSync { blocks_total: match self.highest_block { Some(x) if x > self.starting_block => x - self.starting_block, _ => 0 }, num_peers: self.peers.len(), num_active_peers: self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(), - mem_used: + mem_used: // TODO: https://github.com/servo/heapsize/pull/50 - // self.downloading_hashes.heap_size_of_children() - //+ self.downloading_bodies.heap_size_of_children() - //+ self.downloading_hashes.heap_size_of_children() - self.headers.heap_size_of_children() - + self.bodies.heap_size_of_children() - + self.peers.heap_size_of_children() + // self.downloading_hashes.heap_size_of_children() + //+ self.downloading_bodies.heap_size_of_children() + //+ self.downloading_hashes.heap_size_of_children() + self.headers.heap_size_of_children() + + self.bodies.heap_size_of_children() + + self.peers.heap_size_of_children() + self.header_ids.heap_size_of_children(), } } @@ -292,6 +298,7 @@ impl ChainSync { self.starting_block = 0; self.highest_block = None; self.have_common_block = false; + self.transaction_queue.lock().unwrap().clear(); self.starting_block = io.chain().chain_info().best_block_number; self.state = SyncState::NotSynced; } @@ -913,8 +920,16 @@ impl ChainSync { } } /// Called when peer sends us new transactions - fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> { - Ok(()) + fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> { + let chain = io.chain(); + let item_count = r.item_count(); + trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); + let fetch_latest_nonce = |a : &Address| chain.nonce(a); + for i in 0..item_count { + let tx: SignedTransaction = try!(r.val_at(i)); + self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce); + } + Ok(()) } /// Send Status message @@ -1242,6 +1257,34 @@ impl ChainSync { } self.last_send_block_number = chain.best_block_number; } + + /// called when block is imported to chain, updates transactions queue + pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], bad: &[H256]) { + fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { + let block = chain + .block(BlockId::Hash(hash.clone())) + .expect("Expected in-chain blocks."); + let block = BlockView::new(&block); + block.transactions() + }; + + let chain = io.chain(); + let good = good.par_iter().map(|h| fetch_transactions(chain, h)); + let bad = bad.par_iter().map(|h| fetch_transactions(chain, h)); + + good.for_each(|txs| { + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.remove_all(&txs); + }); + bad.for_each(|txs| { + // populate sender + for tx in &txs { + let _sender = tx.sender(); + } + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.add_all(txs, |a| chain.nonce(a)); + }); + } } #[cfg(test)] @@ -1571,6 +1614,32 @@ mod tests { assert!(result.is_ok()); } + #[test] + fn should_add_transactions_to_queue() { + // given + let mut client = TestBlockChainClient::new(); + // client.add_blocks(98, BlocksWith::Uncle); + // client.add_blocks(1, BlocksWith::UncleAndTransaction); + // client.add_blocks(1, BlocksWith::Transaction); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + + let good_blocks = vec![client.block_hash_delta_minus(2)]; + let bad_blocks = vec![client.block_hash_delta_minus(1)]; + + let mut queue = VecDeque::new(); + let io = TestIo::new(&mut client, &mut queue, None); + + // when + sync.chain_new_blocks(&io, &[], &good_blocks); + assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); + sync.chain_new_blocks(&io, &good_blocks, &bad_blocks); + + // then + let status = sync.transaction_queue.lock().unwrap().status(); + assert_eq!(status.pending, 1); + assert_eq!(status.future, 0); + } + #[test] fn returns_requested_block_headers() { let mut client = TestBlockChainClient::new(); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 74541660d..44f3f02e0 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -54,6 +54,7 @@ extern crate ethcore; extern crate env_logger; extern crate time; extern crate rand; +extern crate rayon; #[macro_use] extern crate heapsize; @@ -70,8 +71,7 @@ use io::NetSyncIo; mod chain; mod io; mod range_collection; -// TODO [todr] Made public to suppress dead code warnings -pub mod transaction_queue; +mod transaction_queue; #[cfg(test)] mod tests; @@ -153,8 +153,14 @@ impl NetworkProtocolHandler for EthSync { } fn message(&self, io: &NetworkContext, message: &SyncMessage) { - if let SyncMessage::BlockVerified = *message { - self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); + match *message { + SyncMessage::BlockVerified => { + self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); + }, + SyncMessage::NewChainBlocks { ref good, ref bad } => { + let sync_io = NetSyncIo::new(io, self.chain.deref()); + self.sync.write().unwrap().chain_new_blocks(&sync_io, good, bad); + } } } } diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index b788e0c2a..302836920 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -105,6 +105,10 @@ impl BlockChainClient for TestBlockChainClient { Some(U256::zero()) } + fn nonce(&self, _address: &Address) -> U256 { + U256::zero() + } + fn code(&self, _address: &Address) -> Option { unimplemented!(); } From c6934431d12af045364bb9256d45305969ccee67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 25 Feb 2016 11:49:12 +0100 Subject: [PATCH 2/5] Adding test for sync.chain_new_blocks. --- sync/src/chain.rs | 33 ++++++++++---------- sync/src/tests/chain.rs | 51 ++++++++++++++++--------------- sync/src/tests/helpers.rs | 57 +++++++++++++++++++++++++++-------- sync/src/transaction_queue.rs | 1 + 4 files changed, 88 insertions(+), 54 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index fa033813e..9edab791a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1425,7 +1425,7 @@ mod tests { #[test] fn finds_lagging_peers() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, BlocksWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); let io = TestIo::new(&mut client, &mut queue, None); @@ -1438,7 +1438,7 @@ mod tests { #[test] fn calculates_tree_for_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(15, false); + client.add_blocks(15, BlocksWith::Uncle); let start = client.block_hash_delta_minus(4); let end = client.block_hash_delta_minus(2); @@ -1455,7 +1455,7 @@ mod tests { #[test] fn sends_new_hashes_to_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, BlocksWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let best_hash = client.chain_info().best_block_hash.clone(); @@ -1475,7 +1475,7 @@ mod tests { #[test] fn sends_latest_block_to_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, BlocksWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let best_hash = client.chain_info().best_block_hash.clone(); @@ -1495,7 +1495,7 @@ mod tests { #[test] fn handles_peer_new_block_mallformed() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, false); + client.add_blocks(10, BlocksWith::Uncle); let block_data = get_dummy_block(11, client.chain_info().best_block_hash); @@ -1513,7 +1513,7 @@ mod tests { #[test] fn handles_peer_new_block() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, false); + client.add_blocks(10, BlocksWith::Uncle); let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash); @@ -1531,7 +1531,7 @@ mod tests { #[test] fn handles_peer_new_block_empty() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, false); + client.add_blocks(10, BlocksWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1547,7 +1547,7 @@ mod tests { #[test] fn handles_peer_new_hashes() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, false); + client.add_blocks(10, BlocksWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1563,7 +1563,7 @@ mod tests { #[test] fn handles_peer_new_hashes_empty() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, false); + client.add_blocks(10, BlocksWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1581,7 +1581,7 @@ mod tests { #[test] fn hashes_rlp_mutually_acceptable() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, BlocksWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let best_hash = client.chain_info().best_block_hash.clone(); @@ -1600,7 +1600,7 @@ mod tests { #[test] fn block_rlp_mutually_acceptable() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, BlocksWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let best_hash = client.chain_info().best_block_hash.clone(); @@ -1618,9 +1618,9 @@ mod tests { fn should_add_transactions_to_queue() { // given let mut client = TestBlockChainClient::new(); - // client.add_blocks(98, BlocksWith::Uncle); - // client.add_blocks(1, BlocksWith::UncleAndTransaction); - // client.add_blocks(1, BlocksWith::Transaction); + client.add_blocks(98, BlocksWith::Uncle); + client.add_blocks(1, BlocksWith::UncleAndTransaction); + client.add_blocks(1, BlocksWith::Transaction); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let good_blocks = vec![client.block_hash_delta_minus(2)]; @@ -1631,6 +1631,7 @@ mod tests { // when sync.chain_new_blocks(&io, &[], &good_blocks); + assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); sync.chain_new_blocks(&io, &good_blocks, &bad_blocks); @@ -1643,7 +1644,7 @@ mod tests { #[test] fn returns_requested_block_headers() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, BlocksWith::Uncle); let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); @@ -1667,7 +1668,7 @@ mod tests { #[test] fn returns_requested_block_headers_reverse() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, false); + client.add_blocks(100, BlocksWith::Uncle); let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 1dd9a1e78..78663aa16 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -24,8 +24,8 @@ use super::helpers::*; fn two_peers() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); + net.peer_mut(1).chain.add_blocks(1000, BlocksWith::Uncle); + net.peer_mut(2).chain.add_blocks(1000, BlocksWith::Uncle); net.sync(); assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); @@ -35,8 +35,8 @@ fn two_peers() { fn status_after_sync() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); + net.peer_mut(1).chain.add_blocks(1000, BlocksWith::Uncle); + net.peer_mut(2).chain.add_blocks(1000, BlocksWith::Uncle); net.sync(); let status = net.peer(0).sync.status(); assert_eq!(status.state, SyncState::Idle); @@ -45,8 +45,8 @@ fn status_after_sync() { #[test] fn takes_few_steps() { let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(100, false); - net.peer_mut(2).chain.add_blocks(100, false); + net.peer_mut(1).chain.add_blocks(100, BlocksWith::Uncle); + net.peer_mut(2).chain.add_blocks(100, BlocksWith::Uncle); let total_steps = net.sync(); assert!(total_steps < 7); } @@ -56,8 +56,9 @@ fn empty_blocks() { ::env_logger::init().ok(); let mut net = TestNet::new(3); for n in 0..200 { - net.peer_mut(1).chain.add_blocks(5, n % 2 == 0); - net.peer_mut(2).chain.add_blocks(5, n % 2 == 0); + let with = if n % 2 == 0 { BlocksWith::Nothing } else { BlocksWith::Uncle }; + net.peer_mut(1).chain.add_blocks(5, with.clone()); + net.peer_mut(2).chain.add_blocks(5, with); } net.sync(); assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); @@ -68,14 +69,14 @@ fn empty_blocks() { fn forked() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(0).chain.add_blocks(300, false); - net.peer_mut(1).chain.add_blocks(300, false); - net.peer_mut(2).chain.add_blocks(300, false); - net.peer_mut(0).chain.add_blocks(100, true); //fork - net.peer_mut(1).chain.add_blocks(200, false); - net.peer_mut(2).chain.add_blocks(200, false); - net.peer_mut(1).chain.add_blocks(100, false); //fork between 1 and 2 - net.peer_mut(2).chain.add_blocks(10, true); + net.peer_mut(0).chain.add_blocks(300, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(300, BlocksWith::Uncle); + net.peer_mut(2).chain.add_blocks(300, BlocksWith::Uncle); + net.peer_mut(0).chain.add_blocks(100, BlocksWith::Nothing); //fork + net.peer_mut(1).chain.add_blocks(200, BlocksWith::Uncle); + net.peer_mut(2).chain.add_blocks(200, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(100, BlocksWith::Uncle); //fork between 1 and 2 + net.peer_mut(2).chain.add_blocks(10, BlocksWith::Nothing); // peer 1 has the best chain of 601 blocks let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); net.sync(); @@ -87,8 +88,8 @@ fn forked() { #[test] fn restart() { let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, false); - net.peer_mut(2).chain.add_blocks(1000, false); + net.peer_mut(1).chain.add_blocks(1000, BlocksWith::Uncle); + net.peer_mut(2).chain.add_blocks(1000, BlocksWith::Uncle); net.sync_steps(8); @@ -109,8 +110,8 @@ fn status_empty() { #[test] fn status_packet() { let mut net = TestNet::new(2); - net.peer_mut(0).chain.add_blocks(100, false); - net.peer_mut(1).chain.add_blocks(1, false); + net.peer_mut(0).chain.add_blocks(100, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(1, BlocksWith::Uncle); net.start(); @@ -123,10 +124,10 @@ fn status_packet() { #[test] fn propagade_hashes() { let mut net = TestNet::new(6); - net.peer_mut(1).chain.add_blocks(10, false); + net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle); net.sync(); - net.peer_mut(0).chain.add_blocks(10, false); + net.peer_mut(0).chain.add_blocks(10, BlocksWith::Uncle); net.sync(); net.trigger_block_verified(0); //first event just sets the marker net.trigger_block_verified(0); @@ -149,10 +150,10 @@ fn propagade_hashes() { #[test] fn propagade_blocks() { let mut net = TestNet::new(2); - net.peer_mut(1).chain.add_blocks(10, false); + net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle); net.sync(); - net.peer_mut(0).chain.add_blocks(10, false); + net.peer_mut(0).chain.add_blocks(10, BlocksWith::Uncle); net.trigger_block_verified(0); //first event just sets the marker net.trigger_block_verified(0); @@ -164,7 +165,7 @@ fn propagade_blocks() { #[test] fn restart_on_malformed_block() { let mut net = TestNet::new(2); - net.peer_mut(1).chain.add_blocks(10, false); + net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle); net.peer_mut(1).chain.corrupt_block(6); net.sync_steps(10); diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 302836920..bb980e3f9 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -22,7 +22,7 @@ use io::SyncIo; use chain::ChainSync; use ::SyncConfig; use ethcore::receipt::Receipt; -use ethcore::transaction::LocalizedTransaction; +use ethcore::transaction::{LocalizedTransaction, Transaction, Action}; use ethcore::filter::Filter; use ethcore::log_entry::LocalizedLogEntry; @@ -34,6 +34,14 @@ pub struct TestBlockChainClient { pub difficulty: RwLock, } +#[derive(Clone)] +pub enum BlocksWith { + Nothing, + Uncle, + Transaction, + UncleAndTransaction +} + impl TestBlockChainClient { pub fn new() -> TestBlockChainClient { @@ -44,30 +52,53 @@ impl TestBlockChainClient { last_hash: RwLock::new(H256::new()), difficulty: RwLock::new(From::from(0)), }; - client.add_blocks(1, true); // add genesis block + client.add_blocks(1, BlocksWith::Nothing); // add genesis block client.genesis_hash = client.last_hash.read().unwrap().clone(); client } - pub fn add_blocks(&mut self, count: usize, empty: bool) { + pub fn add_blocks(&mut self, count: usize, with: BlocksWith) { let len = self.numbers.read().unwrap().len(); for n in len..(len + count) { let mut header = BlockHeader::new(); header.difficulty = From::from(n); header.parent_hash = self.last_hash.read().unwrap().clone(); header.number = n as BlockNumber; - let mut uncles = RlpStream::new_list(if empty {0} else {1}); - if !empty { - let mut uncle_header = BlockHeader::new(); - uncle_header.difficulty = From::from(n); - uncle_header.parent_hash = self.last_hash.read().unwrap().clone(); - uncle_header.number = n as BlockNumber; - uncles.append(&uncle_header); - header.uncles_hash = uncles.as_raw().sha3(); - } + let uncles = match with { + BlocksWith::Uncle | BlocksWith::UncleAndTransaction => { + let mut uncles = RlpStream::new_list(1); + let mut uncle_header = BlockHeader::new(); + uncle_header.difficulty = From::from(n); + uncle_header.parent_hash = self.last_hash.read().unwrap().clone(); + uncle_header.number = n as BlockNumber; + uncles.append(&uncle_header); + header.uncles_hash = uncles.as_raw().sha3(); + uncles + }, + _ => RlpStream::new_list(0) + }; + let txs = match with { + BlocksWith::Transaction | BlocksWith::UncleAndTransaction => { + let mut txs = RlpStream::new_list(1); + let keypair = KeyPair::create().unwrap(); + let tx = Transaction { + action: Action::Create, + value: U256::from(100), + data: "3331600055".from_hex().unwrap(), + gas: U256::from(100_000), + gas_price: U256::one(), + nonce: U256::one() + }; + let signed_tx = tx.sign(&keypair.secret()); + txs.append(&signed_tx); + txs.out() + }, + _ => rlp::NULL_RLP.to_vec() + }; + let mut rlp = RlpStream::new_list(3); rlp.append(&header); - rlp.append_raw(&rlp::NULL_RLP, 1); + rlp.append_raw(&txs, 1); rlp.append_raw(uncles.as_raw(), 1); self.import_block(rlp.as_raw().to_vec()).unwrap(); } diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index 341607afe..fa6026477 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -346,6 +346,7 @@ impl TransactionQueue { return; } else if next_nonce > nonce { // Droping transaction + trace!(target: "sync", "Dropping transaction with nonce: {} - expecting: {}", nonce, next_nonce); return; } From bcaed67eaa0257c8998925287fd2dbc40df12698 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 4 Mar 2016 16:48:10 +0100 Subject: [PATCH 3/5] Swapping order of inserting block to chain and commiting to DB to avoid race conditions --- ethcore/src/client.rs | 12 +++++++----- sync/src/transaction_queue.rs | 4 ---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index d442a3d88..fdcd6c057 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -329,18 +329,14 @@ impl Client { bad_blocks.insert(header.hash()); continue; } - let closed_block = self.check_and_close_block(&block); if let Err(_) = closed_block { bad_blocks.insert(header.hash()); break; } - - // Insert block - let closed_block = closed_block.unwrap(); - self.chain.write().unwrap().insert_block(&block.bytes, closed_block.block().receipts().clone()); good_blocks.push(header.hash()); + // Are we committing an era? let ancient = if header.number() >= HISTORY { let n = header.number() - HISTORY; let chain = self.chain.read().unwrap(); @@ -350,10 +346,16 @@ impl Client { }; // Commit results + let closed_block = closed_block.unwrap(); + let receipts = closed_block.block().receipts().clone(); closed_block.drain() .commit(header.number(), &header.hash(), ancient) .expect("State DB commit failed."); + // And update the chain + self.chain.write().unwrap() + .insert_block(&block.bytes, receipts); + self.report.write().unwrap().accrue_block(&block); trace!(target: "client", "Imported #{} ({})", header.number(), header.hash()); } diff --git a/sync/src/transaction_queue.rs b/sync/src/transaction_queue.rs index f551fe435..83665dfda 100644 --- a/sync/src/transaction_queue.rs +++ b/sync/src/transaction_queue.rs @@ -240,7 +240,6 @@ impl TransactionQueue { let sender = transaction.sender(); let nonce = transaction.nonce(); - println!("Removing tx: {:?}", transaction.transaction); // Remove from future self.future.drop(&sender, &nonce); @@ -266,7 +265,6 @@ impl TransactionQueue { // Goes to future or is removed let order = self.current.drop(&sender, &k).unwrap(); if k >= current_nonce { - println!("Moving to future: {:?}", order); self.future.insert(sender.clone(), k, order.update_height(k, current_nonce)); } else { self.by_hash.remove(&order.hash); @@ -310,7 +308,6 @@ impl TransactionQueue { // remove also from priority and hash self.future.by_priority.remove(&order); // Put to current - println!("Moved: {:?}", order); let order = order.update_height(current_nonce.clone(), first_nonce); self.current.insert(address.clone(), current_nonce, order); current_nonce = current_nonce + U256::one(); @@ -331,7 +328,6 @@ impl TransactionQueue { .cloned() .map_or_else(|| fetch_nonce(&address), |n| n + U256::one()); - println!("Expected next: {:?}, got: {:?}", next_nonce, nonce); // Check height if nonce > next_nonce { let order = TransactionOrder::for_transaction(&tx, next_nonce); From 8a13e87cbeb0905077a25092f088a54482ab2e49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 5 Mar 2016 11:30:09 +0100 Subject: [PATCH 4/5] Renaming BlocksWith helper to EachBlockWith --- sync/src/chain.rs | 34 +++++++++++++-------------- sync/src/tests/chain.rs | 48 +++++++++++++++++++-------------------- sync/src/tests/helpers.rs | 10 ++++---- 3 files changed, 46 insertions(+), 46 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 983ce62c3..e8ad81a3a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -491,7 +491,7 @@ impl ChainSync { trace!(target: "sync", "New block already queued {:?}", h); }, Ok(_) => { - if self.current_base_block() < header.number { + if self.current_base_block() < header.number { self.last_imported_block = Some(header.number); self.remove_downloaded_blocks(header.number); } @@ -1433,7 +1433,7 @@ mod tests { #[test] fn finds_lagging_peers() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10)); let chain_info = client.chain_info(); @@ -1447,7 +1447,7 @@ mod tests { #[test] fn calculates_tree_for_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(15, BlocksWith::Uncle); + client.add_blocks(15, EachBlockWith::Uncle); let start = client.block_hash_delta_minus(4); let end = client.block_hash_delta_minus(2); @@ -1464,7 +1464,7 @@ mod tests { #[test] fn sends_new_hashes_to_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let chain_info = client.chain_info(); @@ -1483,7 +1483,7 @@ mod tests { #[test] fn sends_latest_block_to_lagging_peer() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let chain_info = client.chain_info(); @@ -1501,7 +1501,7 @@ mod tests { #[test] fn handles_peer_new_block_mallformed() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, BlocksWith::Uncle); + client.add_blocks(10, EachBlockWith::Uncle); let block_data = get_dummy_block(11, client.chain_info().best_block_hash); @@ -1519,7 +1519,7 @@ mod tests { #[test] fn handles_peer_new_block() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, BlocksWith::Uncle); + client.add_blocks(10, EachBlockWith::Uncle); let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash); @@ -1537,7 +1537,7 @@ mod tests { #[test] fn handles_peer_new_block_empty() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, BlocksWith::Uncle); + client.add_blocks(10, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1553,7 +1553,7 @@ mod tests { #[test] fn handles_peer_new_hashes() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, BlocksWith::Uncle); + client.add_blocks(10, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1569,7 +1569,7 @@ mod tests { #[test] fn handles_peer_new_hashes_empty() { let mut client = TestBlockChainClient::new(); - client.add_blocks(10, BlocksWith::Uncle); + client.add_blocks(10, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let mut io = TestIo::new(&mut client, &mut queue, None); @@ -1587,7 +1587,7 @@ mod tests { #[test] fn hashes_rlp_mutually_acceptable() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let chain_info = client.chain_info(); @@ -1605,7 +1605,7 @@ mod tests { #[test] fn block_rlp_mutually_acceptable() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let chain_info = client.chain_info(); @@ -1622,9 +1622,9 @@ mod tests { fn should_add_transactions_to_queue() { // given let mut client = TestBlockChainClient::new(); - client.add_blocks(98, BlocksWith::Uncle); - client.add_blocks(1, BlocksWith::UncleAndTransaction); - client.add_blocks(1, BlocksWith::Transaction); + client.add_blocks(98, EachBlockWith::Uncle); + client.add_blocks(1, EachBlockWith::UncleAndTransaction); + client.add_blocks(1, EachBlockWith::Transaction); let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let good_blocks = vec![client.block_hash_delta_minus(2)]; @@ -1648,7 +1648,7 @@ mod tests { #[test] fn returns_requested_block_headers() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); @@ -1672,7 +1672,7 @@ mod tests { #[test] fn returns_requested_block_headers_reverse() { let mut client = TestBlockChainClient::new(); - client.add_blocks(100, BlocksWith::Uncle); + client.add_blocks(100, EachBlockWith::Uncle); let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index b388f508d..58f50916e 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -24,8 +24,8 @@ use super::helpers::*; fn two_peers() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, BlocksWith::Uncle); - net.peer_mut(2).chain.add_blocks(1000, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); assert!(net.peer(0).chain.block(BlockId::Number(1000)).is_some()); assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref()); @@ -35,8 +35,8 @@ fn two_peers() { fn status_after_sync() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, BlocksWith::Uncle); - net.peer_mut(2).chain.add_blocks(1000, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync(); let status = net.peer(0).sync.status(); assert_eq!(status.state, SyncState::Idle); @@ -45,8 +45,8 @@ fn status_after_sync() { #[test] fn takes_few_steps() { let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(100, BlocksWith::Uncle); - net.peer_mut(2).chain.add_blocks(100, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Uncle); let total_steps = net.sync(); assert!(total_steps < 7); } @@ -56,7 +56,7 @@ fn empty_blocks() { ::env_logger::init().ok(); let mut net = TestNet::new(3); for n in 0..200 { - let with = if n % 2 == 0 { BlocksWith::Nothing } else { BlocksWith::Uncle }; + let with = if n % 2 == 0 { EachBlockWith::Nothing } else { EachBlockWith::Uncle }; net.peer_mut(1).chain.add_blocks(5, with.clone()); net.peer_mut(2).chain.add_blocks(5, with); } @@ -69,14 +69,14 @@ fn empty_blocks() { fn forked() { ::env_logger::init().ok(); let mut net = TestNet::new(3); - net.peer_mut(0).chain.add_blocks(300, BlocksWith::Uncle); - net.peer_mut(1).chain.add_blocks(300, BlocksWith::Uncle); - net.peer_mut(2).chain.add_blocks(300, BlocksWith::Uncle); - net.peer_mut(0).chain.add_blocks(100, BlocksWith::Nothing); //fork - net.peer_mut(1).chain.add_blocks(200, BlocksWith::Uncle); - net.peer_mut(2).chain.add_blocks(200, BlocksWith::Uncle); - net.peer_mut(1).chain.add_blocks(100, BlocksWith::Uncle); //fork between 1 and 2 - net.peer_mut(2).chain.add_blocks(10, BlocksWith::Nothing); + net.peer_mut(0).chain.add_blocks(300, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(300, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(300, EachBlockWith::Uncle); + net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Nothing); //fork + net.peer_mut(1).chain.add_blocks(200, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(200, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle); //fork between 1 and 2 + net.peer_mut(2).chain.add_blocks(10, EachBlockWith::Nothing); // peer 1 has the best chain of 601 blocks let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone(); net.sync(); @@ -88,8 +88,8 @@ fn forked() { #[test] fn restart() { let mut net = TestNet::new(3); - net.peer_mut(1).chain.add_blocks(1000, BlocksWith::Uncle); - net.peer_mut(2).chain.add_blocks(1000, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(1000, EachBlockWith::Uncle); + net.peer_mut(2).chain.add_blocks(1000, EachBlockWith::Uncle); net.sync_steps(8); @@ -110,8 +110,8 @@ fn status_empty() { #[test] fn status_packet() { let mut net = TestNet::new(2); - net.peer_mut(0).chain.add_blocks(100, BlocksWith::Uncle); - net.peer_mut(1).chain.add_blocks(1, BlocksWith::Uncle); + net.peer_mut(0).chain.add_blocks(100, EachBlockWith::Uncle); + net.peer_mut(1).chain.add_blocks(1, EachBlockWith::Uncle); net.start(); @@ -124,10 +124,10 @@ fn status_packet() { #[test] fn propagate_hashes() { let mut net = TestNet::new(6); - net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); - net.peer_mut(0).chain.add_blocks(10, BlocksWith::Uncle); + net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); net.trigger_block_verified(0); //first event just sets the marker net.trigger_block_verified(0); @@ -150,10 +150,10 @@ fn propagate_hashes() { #[test] fn propagate_blocks() { let mut net = TestNet::new(2); - net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); - net.peer_mut(0).chain.add_blocks(10, BlocksWith::Uncle); + net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); net.trigger_block_verified(0); //first event just sets the marker net.trigger_block_verified(0); @@ -165,7 +165,7 @@ fn propagate_blocks() { #[test] fn restart_on_malformed_block() { let mut net = TestNet::new(2); - net.peer_mut(1).chain.add_blocks(10, BlocksWith::Uncle); + net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle); net.peer_mut(1).chain.corrupt_block(6); net.sync_steps(10); diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index ca7776bf3..5b53ad90b 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -35,7 +35,7 @@ pub struct TestBlockChainClient { } #[derive(Clone)] -pub enum BlocksWith { +pub enum EachBlockWith { Nothing, Uncle, Transaction, @@ -52,12 +52,12 @@ impl TestBlockChainClient { last_hash: RwLock::new(H256::new()), difficulty: RwLock::new(From::from(0)), }; - client.add_blocks(1, BlocksWith::Nothing); // add genesis block + client.add_blocks(1, EachBlockWith::Nothing); // add genesis block client.genesis_hash = client.last_hash.read().unwrap().clone(); client } - pub fn add_blocks(&mut self, count: usize, with: BlocksWith) { + pub fn add_blocks(&mut self, count: usize, with: EachBlockWith) { let len = self.numbers.read().unwrap().len(); for n in len..(len + count) { let mut header = BlockHeader::new(); @@ -65,7 +65,7 @@ impl TestBlockChainClient { header.parent_hash = self.last_hash.read().unwrap().clone(); header.number = n as BlockNumber; let uncles = match with { - BlocksWith::Uncle | BlocksWith::UncleAndTransaction => { + EachBlockWith::Uncle | EachBlockWith::UncleAndTransaction => { let mut uncles = RlpStream::new_list(1); let mut uncle_header = BlockHeader::new(); uncle_header.difficulty = From::from(n); @@ -78,7 +78,7 @@ impl TestBlockChainClient { _ => RlpStream::new_list(0) }; let txs = match with { - BlocksWith::Transaction | BlocksWith::UncleAndTransaction => { + EachBlockWith::Transaction | EachBlockWith::UncleAndTransaction => { let mut txs = RlpStream::new_list(1); let keypair = KeyPair::create().unwrap(); let tx = Transaction { From b9a6a70cede04e276aaa9e1ea38ee5cef17d3133 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Sat, 5 Mar 2016 11:37:19 +0100 Subject: [PATCH 5/5] Renaming bad blocks as retracted --- ethcore/src/client.rs | 2 +- ethcore/src/service.rs | 2 +- sync/src/chain.rs | 11 ++++++----- sync/src/lib.rs | 4 ++-- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index ef0356d3e..878bacce9 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -410,7 +410,7 @@ impl Client where V: Verifier { if !good_blocks.is_empty() && block_queue.queue_info().is_empty() { io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { good: good_blocks, - bad: bad_blocks, + retracted: bad_blocks, })).unwrap(); } } diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index 756d02407..a80adb0ba 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -30,7 +30,7 @@ pub enum SyncMessage { /// Hashes of blocks imported to blockchain good: Vec, /// Hashes of blocks not imported to blockchain - bad: Vec, + retracted: Vec, }, /// A block is ready BlockVerified, diff --git a/sync/src/chain.rs b/sync/src/chain.rs index e8ad81a3a..ddf30854a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1265,10 +1265,11 @@ impl ChainSync { } /// called when block is imported to chain, updates transactions queue - pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], bad: &[H256]) { + pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) { fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { let block = chain .block(BlockId::Hash(hash.clone())) + // Client should send message after commit to db and inserting to chain. .expect("Expected in-chain blocks."); let block = BlockView::new(&block); block.transactions() @@ -1277,14 +1278,14 @@ impl ChainSync { let chain = io.chain(); let good = good.par_iter().map(|h| fetch_transactions(chain, h)); - let bad = bad.par_iter().map(|h| fetch_transactions(chain, h)); + let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h)); good.for_each(|txs| { let mut transaction_queue = self.transaction_queue.lock().unwrap(); let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); }); - bad.for_each(|txs| { + retracted.for_each(|txs| { // populate sender for tx in &txs { let _sender = tx.sender(); @@ -1628,7 +1629,7 @@ mod tests { let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); let good_blocks = vec![client.block_hash_delta_minus(2)]; - let bad_blocks = vec![client.block_hash_delta_minus(1)]; + let retracted_blocks = vec![client.block_hash_delta_minus(1)]; let mut queue = VecDeque::new(); let io = TestIo::new(&mut client, &mut queue, None); @@ -1637,7 +1638,7 @@ mod tests { sync.chain_new_blocks(&io, &[], &good_blocks); assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); - sync.chain_new_blocks(&io, &good_blocks, &bad_blocks); + sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks); // then let status = sync.transaction_queue.lock().unwrap().status(); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 44f3f02e0..d67a09f3b 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -157,9 +157,9 @@ impl NetworkProtocolHandler for EthSync { SyncMessage::BlockVerified => { self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); }, - SyncMessage::NewChainBlocks { ref good, ref bad } => { + SyncMessage::NewChainBlocks { ref good, ref retracted } => { let sync_io = NetSyncIo::new(io, self.chain.deref()); - self.sync.write().unwrap().chain_new_blocks(&sync_io, good, bad); + self.sync.write().unwrap().chain_new_blocks(&sync_io, good, retracted); } } }