diff --git a/ethcore/src/client.rs b/ethcore/src/client.rs index b6bcccb35..874fc9646 100644 --- a/ethcore/src/client.rs +++ b/ethcore/src/client.rs @@ -418,7 +418,9 @@ impl Client where V: Verifier { if !good_blocks.is_empty() && block_queue.queue_info().is_empty() { io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks { good: good_blocks, - retracted: bad_blocks, + bad: bad_blocks, + // TODO [todr] were to take those from? + retracted: vec![], })).unwrap(); } } diff --git a/ethcore/src/service.rs b/ethcore/src/service.rs index a80adb0ba..443d09e3b 100644 --- a/ethcore/src/service.rs +++ b/ethcore/src/service.rs @@ -30,6 +30,8 @@ pub enum SyncMessage { /// Hashes of blocks imported to blockchain good: Vec, /// Hashes of blocks not imported to blockchain + bad: Vec, + /// Hashes of blocks that were removed from canonical chain retracted: Vec, }, /// A block is ready diff --git a/sync/src/chain.rs b/sync/src/chain.rs index a67e27868..7294570fe 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -207,7 +207,7 @@ pub struct ChainSync { /// True if common block for our and remote chain has been found have_common_block: bool, /// Last propagated block number - last_send_block_number: BlockNumber, + last_sent_block_number: BlockNumber, /// Max blocks to download ahead max_download_ahead_blocks: usize, /// Network ID @@ -236,7 +236,7 @@ impl ChainSync { last_imported_hash: None, syncing_difficulty: U256::from(0u64), have_common_block: false, - last_send_block_number: 0, + last_sent_block_number: 0, max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks), network_id: config.network_id, transaction_queue: Mutex::new(TransactionQueue::new()), @@ -850,8 +850,8 @@ impl ChainSync { self.downloading_bodies.remove(&n); self.downloading_headers.remove(&n); } - self.headers.remove_tail(&start); - self.bodies.remove_tail(&start); + self.headers.remove_from(&start); + self.bodies.remove_from(&start); } /// Request headers from a peer by block hash @@ -931,9 +931,11 @@ impl ChainSync { let item_count = r.item_count(); trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count); let fetch_latest_nonce = |a : &Address| chain.nonce(a); + + let mut transaction_queue = self.transaction_queue.lock().unwrap(); for i in 0..item_count { let tx: SignedTransaction = try!(r.val_at(i)); - self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce); + transaction_queue.add(tx, &fetch_latest_nonce); } Ok(()) } @@ -1244,26 +1246,25 @@ impl ChainSync { sent } + fn propagate_latest_blocks(&mut self, io: &mut SyncIo) { + let chain_info = io.chain().chain_info(); + if (((chain_info.best_block_number as i64) - (self.last_sent_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { + let blocks = self.propagate_blocks(&chain_info, io); + let hashes = self.propagate_new_hashes(&chain_info, io); + if blocks != 0 || hashes != 0 { + trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); + } + } + self.last_sent_block_number = chain_info.best_block_number; + } + /// Maintain other peers. Send out any new blocks and transactions pub fn maintain_sync(&mut self, io: &mut SyncIo) { self.check_resume(io); } - /// should be called once chain has new block, triggers the latest block propagation - pub fn chain_blocks_verified(&mut self, io: &mut SyncIo) { - let chain = io.chain().chain_info(); - if (((chain.best_block_number as i64) - (self.last_send_block_number as i64)).abs() as BlockNumber) < MAX_PEER_LAG_PROPAGATION { - let blocks = self.propagate_blocks(&chain, io); - let hashes = self.propagate_new_hashes(&chain, io); - if blocks != 0 || hashes != 0 { - trace!(target: "sync", "Sent latest {} blocks and {} hashes to peers.", blocks, hashes); - } - } - self.last_send_block_number = chain.best_block_number; - } - - /// called when block is imported to chain, updates transactions queue - pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) { + /// called when block is imported to chain, updates transactions queue and propagates the blocks + pub fn chain_new_blocks(&mut self, io: &mut SyncIo, good: &[H256], bad: &[H256], _retracted: &[H256]) { fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec { let block = chain .block(BlockId::Hash(hash.clone())) @@ -1274,23 +1275,29 @@ impl ChainSync { } - let chain = io.chain(); - let good = good.par_iter().map(|h| fetch_transactions(chain, h)); - let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h)); + { + let chain = io.chain(); + let good = good.par_iter().map(|h| fetch_transactions(chain, h)); + let bad = bad.par_iter().map(|h| fetch_transactions(chain, h)); - good.for_each(|txs| { - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); - transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); - }); - retracted.for_each(|txs| { - // populate sender - for tx in &txs { - let _sender = tx.sender(); - } - let mut transaction_queue = self.transaction_queue.lock().unwrap(); - transaction_queue.add_all(txs, |a| chain.nonce(a)); - }); + good.for_each(|txs| { + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + let hashes = txs.iter().map(|tx| tx.hash()).collect::>(); + transaction_queue.remove_all(&hashes, |a| chain.nonce(a)); + }); + bad.for_each(|txs| { + // populate sender + for tx in &txs { + let _sender = tx.sender(); + } + let mut transaction_queue = self.transaction_queue.lock().unwrap(); + transaction_queue.add_all(txs, |a| chain.nonce(a)); + }); + } + + // Propagate latests blocks + self.propagate_latest_blocks(io); + // TODO [todr] propagate transactions? } pub fn transaction_queue(&self) -> &Mutex { @@ -1634,13 +1641,13 @@ mod tests { let retracted_blocks = vec![client.block_hash_delta_minus(1)]; let mut queue = VecDeque::new(); - let io = TestIo::new(&mut client, &mut queue, None); + let mut io = TestIo::new(&mut client, &mut queue, None); // when - sync.chain_new_blocks(&io, &[], &good_blocks); + sync.chain_new_blocks(&mut io, &[], &good_blocks, &[]); assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0); assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1); - sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks); + sync.chain_new_blocks(&mut io, &good_blocks, &retracted_blocks, &[]); // then let status = sync.transaction_queue.lock().unwrap().status(); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index e352144bd..fdcf79749 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -164,13 +164,11 @@ impl NetworkProtocolHandler for EthSync { fn message(&self, io: &NetworkContext, message: &SyncMessage) { match *message { - SyncMessage::BlockVerified => { - self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref())); + SyncMessage::NewChainBlocks { ref good, ref bad, ref retracted } => { + let mut sync_io = NetSyncIo::new(io, self.chain.deref()); + self.sync.write().unwrap().chain_new_blocks(&mut sync_io, good, bad, retracted); }, - SyncMessage::NewChainBlocks { ref good, ref retracted } => { - let sync_io = NetSyncIo::new(io, self.chain.deref()); - self.sync.write().unwrap().chain_new_blocks(&sync_io, good, retracted); - } + _ => {/* Ignore other messages */}, } } } diff --git a/sync/src/range_collection.rs b/sync/src/range_collection.rs index dc2f4e446..9bb5cc522 100644 --- a/sync/src/range_collection.rs +++ b/sync/src/range_collection.rs @@ -42,6 +42,8 @@ pub trait RangeCollection { fn remove_head(&mut self, start: &K); /// Remove all elements >= `start` in the range that contains `start` fn remove_tail(&mut self, start: &K); + /// Remove all elements >= `start` + fn remove_from(&mut self, start: &K); /// Remove all elements >= `tail` fn insert_item(&mut self, key: K, value: V); /// Get an iterator over ranges @@ -137,6 +139,28 @@ impl RangeCollection for Vec<(K, Vec)> where K: Ord + PartialEq + } } + /// Remove the element and all following it. + fn remove_from(&mut self, key: &K) { + match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) { + Ok(index) => { self.drain(.. index + 1); }, + Err(index) =>{ + let mut empty = false; + match self.get_mut(index) { + Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => { + v.truncate((*key - *k).to_usize()); + empty = v.is_empty(); + } + _ => {} + } + if empty { + self.drain(.. index + 1); + } else { + self.drain(.. index); + } + }, + } + } + /// Remove range elements up to key fn remove_head(&mut self, key: &K) { if *key == FromUsize::from_usize(0) { @@ -272,5 +296,17 @@ fn test_range() { assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); r.remove_tail(&2); assert_eq!(r.range_iter().next(), None); + + let mut r = ranges.clone(); + r.remove_from(&20); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); + r.remove_from(&17); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal); + r.remove_from(&15); + assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal); + r.remove_from(&3); + assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal); + r.remove_from(&2); + assert_eq!(r.range_iter().next(), None); } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 58f50916e..855aa79a6 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -129,8 +129,8 @@ fn propagate_hashes() { net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); net.sync(); - net.trigger_block_verified(0); //first event just sets the marker - net.trigger_block_verified(0); + net.trigger_chain_new_blocks(0); //first event just sets the marker + net.trigger_chain_new_blocks(0); // 5 peers to sync assert_eq!(5, net.peer(0).queue.len()); @@ -154,8 +154,8 @@ fn propagate_blocks() { net.sync(); net.peer_mut(0).chain.add_blocks(10, EachBlockWith::Uncle); - net.trigger_block_verified(0); //first event just sets the marker - net.trigger_block_verified(0); + net.trigger_chain_new_blocks(0); //first event just sets the marker + net.trigger_chain_new_blocks(0); assert!(!net.peer(0).queue.is_empty()); // NEW_BLOCK_PACKET diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index 5b53ad90b..d01dba0b2 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -455,8 +455,8 @@ impl TestNet { self.peers.iter().all(|p| p.queue.is_empty()) } - pub fn trigger_block_verified(&mut self, peer_id: usize) { + pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) { let mut peer = self.peer_mut(peer_id); - peer.sync.chain_blocks_verified(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None)); + peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[]); } }