From 4af85b488b659d63d54f3d34cd5eb1af675feaf1 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 5 Feb 2016 22:54:33 +0100 Subject: [PATCH 01/12] Fixed an issue with forked counters --- util/src/journaldb.rs | 90 ++++++++++++++++++++++++++++++------------- 1 file changed, 64 insertions(+), 26 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 810b06727..2173fdeb6 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -20,7 +20,7 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use rocksdb::{DB, Writable, WriteBatch, IteratorMode}; +use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBVector}; #[cfg(test)] use std::env; @@ -105,6 +105,11 @@ impl JournalDB { // for each end_era that we journaled that we are no passing by, // we remove all of its removes assuming it is canonical and all // of its inserts otherwise. + // + // we also keep track of the counters for each key inserted in the journal to handle the following cases: + // key K is removed in block A(N) and re-inserted in block B(N + C) (where C < H). K must not be deleted from the DB. + // key K is added in block A(N) and reverted in block B(N + C) (where C < H). K must be deleted + // key K is added in blocks A(N) and A'(N) and is reverted in block B(N + C ) (where C < H). K must not be deleted // record new commit's details. let batch = WriteBatch::new(); @@ -125,6 +130,7 @@ impl JournalDB { let mut r = RlpStream::new_list(3); let inserts: Vec = self.overlay.keys().iter().filter(|&(_, &c)| c > 0).map(|(key, _)| key.clone()).collect(); + // Increase counter for each insrted key no matter if the block is canonical or not. for i in &inserts { *counters.entry(i.clone()).or_insert(0) += 1; } @@ -139,6 +145,7 @@ impl JournalDB { if let Some((end_era, canon_id)) = end { let mut index = 0usize; let mut last; + let mut canon_data: Option = None; while let Some(rlp_data) = try!(self.backing.get({ let mut r = RlpStream::new_list(2); r.append(&end_era); @@ -146,35 +153,26 @@ impl JournalDB { last = r.drain(); &last })) { - let to_add; - let rlp = Rlp::new(&rlp_data); - { - to_add = rlp.val_at(1); - for i in &to_add { - let delete_counter = { - if let Some(mut cnt) = counters.get_mut(i) { - *cnt -= 1; - *cnt == 0 - } - else { false } - - }; - if delete_counter { - counters.remove(i); - } - } + let canon = { + let rlp = Rlp::new(&rlp_data); + if canon_id != rlp.val_at(0) { + let to_add: Vec = rlp.val_at(1); + JournalDB::apply_removes(&to_add, &to_add, &mut counters, &batch); + false + } else { true } + }; + if canon { + canon_data = Some(rlp_data) } - let to_remove: Vec = if canon_id == rlp.val_at(0) {rlp.val_at(2)} else {to_add}; - for i in &to_remove { - if !counters.contains_key(i) { - batch.delete(&i).expect("Low-level database error. Some issue with your hard disk?"); - } - } - try!(batch.delete(&last)); - trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, to_remove.len()); index += 1; } + // Canon must be commited last to handle a case when counter reaches 0 in a sibling block + if let Some(ref c) = canon_data { + let rlp = Rlp::new(&c); + let deleted = JournalDB::apply_removes(&rlp.val_at::>(1), &rlp.val_at::>(2), &mut counters, &batch); + trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, deleted); + } try!(batch.put(&LAST_ERA_KEY, &encode(&end_era))); } @@ -200,6 +198,29 @@ impl JournalDB { Ok(ret) } + fn apply_removes(added: &[H256], removed: &[H256], counters: &mut HashMap, batch: &WriteBatch) -> usize { + let mut deleted = 0usize; + // Decrease the counters first + for i in added.iter() { + let delete_counter = { + if let Some(mut cnt) = counters.get_mut(i) { + *cnt -= 1; + *cnt == 0 + } + else { false } + }; + if delete_counter { + counters.remove(i); + } + } + // Remove only if counter reached zero + for i in removed.iter().filter(|i| !counters.contains_key(i)) { + batch.delete(&i).expect("Low-level database error. Some issue with your hard disk?"); + deleted += 1; + } + deleted + } + fn payload(&self, key: &H256) -> Option { self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) } @@ -387,4 +408,21 @@ mod tests { jdb.commit(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap(); assert!(jdb.exists(&foo)); } + + #[test] + fn fork_same_key() { + // history is 1 + let mut jdb = JournalDB::new_temp(); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + + let foo = jdb.insert(b"foo"); + jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap(); + + jdb.insert(b"foo"); + jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + + jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + } } From 2163d8d597945c93efb7f514feb82ca88cf47cf5 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sat, 6 Feb 2016 00:03:16 +0100 Subject: [PATCH 02/12] Block queue clear now clears everything --- ethcore/src/block_queue.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ethcore/src/block_queue.rs b/ethcore/src/block_queue.rs index 2e3728aee..808982b99 100644 --- a/ethcore/src/block_queue.rs +++ b/ethcore/src/block_queue.rs @@ -205,6 +205,8 @@ impl BlockQueue { let mut verification = self.verification.lock().unwrap(); verification.unverified.clear(); verification.verifying.clear(); + verification.verified.clear(); + self.processing.write().unwrap().clear(); } /// Wait for queue to be empty From f2ed89be46adf21b032550acd4831e8fbe1bfeb0 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 5 Feb 2016 22:54:33 +0100 Subject: [PATCH 03/12] Fixed an issue with forked counters --- util/src/journaldb.rs | 90 ++++++++++++++++++++++++++++++------------- 1 file changed, 64 insertions(+), 26 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 810b06727..2173fdeb6 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -20,7 +20,7 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use rocksdb::{DB, Writable, WriteBatch, IteratorMode}; +use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBVector}; #[cfg(test)] use std::env; @@ -105,6 +105,11 @@ impl JournalDB { // for each end_era that we journaled that we are no passing by, // we remove all of its removes assuming it is canonical and all // of its inserts otherwise. + // + // we also keep track of the counters for each key inserted in the journal to handle the following cases: + // key K is removed in block A(N) and re-inserted in block B(N + C) (where C < H). K must not be deleted from the DB. + // key K is added in block A(N) and reverted in block B(N + C) (where C < H). K must be deleted + // key K is added in blocks A(N) and A'(N) and is reverted in block B(N + C ) (where C < H). K must not be deleted // record new commit's details. let batch = WriteBatch::new(); @@ -125,6 +130,7 @@ impl JournalDB { let mut r = RlpStream::new_list(3); let inserts: Vec = self.overlay.keys().iter().filter(|&(_, &c)| c > 0).map(|(key, _)| key.clone()).collect(); + // Increase counter for each insrted key no matter if the block is canonical or not. for i in &inserts { *counters.entry(i.clone()).or_insert(0) += 1; } @@ -139,6 +145,7 @@ impl JournalDB { if let Some((end_era, canon_id)) = end { let mut index = 0usize; let mut last; + let mut canon_data: Option = None; while let Some(rlp_data) = try!(self.backing.get({ let mut r = RlpStream::new_list(2); r.append(&end_era); @@ -146,35 +153,26 @@ impl JournalDB { last = r.drain(); &last })) { - let to_add; - let rlp = Rlp::new(&rlp_data); - { - to_add = rlp.val_at(1); - for i in &to_add { - let delete_counter = { - if let Some(mut cnt) = counters.get_mut(i) { - *cnt -= 1; - *cnt == 0 - } - else { false } - - }; - if delete_counter { - counters.remove(i); - } - } + let canon = { + let rlp = Rlp::new(&rlp_data); + if canon_id != rlp.val_at(0) { + let to_add: Vec = rlp.val_at(1); + JournalDB::apply_removes(&to_add, &to_add, &mut counters, &batch); + false + } else { true } + }; + if canon { + canon_data = Some(rlp_data) } - let to_remove: Vec = if canon_id == rlp.val_at(0) {rlp.val_at(2)} else {to_add}; - for i in &to_remove { - if !counters.contains_key(i) { - batch.delete(&i).expect("Low-level database error. Some issue with your hard disk?"); - } - } - try!(batch.delete(&last)); - trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, to_remove.len()); index += 1; } + // Canon must be commited last to handle a case when counter reaches 0 in a sibling block + if let Some(ref c) = canon_data { + let rlp = Rlp::new(&c); + let deleted = JournalDB::apply_removes(&rlp.val_at::>(1), &rlp.val_at::>(2), &mut counters, &batch); + trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, deleted); + } try!(batch.put(&LAST_ERA_KEY, &encode(&end_era))); } @@ -200,6 +198,29 @@ impl JournalDB { Ok(ret) } + fn apply_removes(added: &[H256], removed: &[H256], counters: &mut HashMap, batch: &WriteBatch) -> usize { + let mut deleted = 0usize; + // Decrease the counters first + for i in added.iter() { + let delete_counter = { + if let Some(mut cnt) = counters.get_mut(i) { + *cnt -= 1; + *cnt == 0 + } + else { false } + }; + if delete_counter { + counters.remove(i); + } + } + // Remove only if counter reached zero + for i in removed.iter().filter(|i| !counters.contains_key(i)) { + batch.delete(&i).expect("Low-level database error. Some issue with your hard disk?"); + deleted += 1; + } + deleted + } + fn payload(&self, key: &H256) -> Option { self.backing.get(&key.bytes()).expect("Low-level database error. Some issue with your hard disk?").map(|v| v.to_vec()) } @@ -387,4 +408,21 @@ mod tests { jdb.commit(3, &b"2".sha3(), Some((0, b"2".sha3()))).unwrap(); assert!(jdb.exists(&foo)); } + + #[test] + fn fork_same_key() { + // history is 1 + let mut jdb = JournalDB::new_temp(); + jdb.commit(0, &b"0".sha3(), None).unwrap(); + + let foo = jdb.insert(b"foo"); + jdb.commit(1, &b"1a".sha3(), Some((0, b"0".sha3()))).unwrap(); + + jdb.insert(b"foo"); + jdb.commit(1, &b"1b".sha3(), Some((0, b"0".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + + jdb.commit(2, &b"2a".sha3(), Some((1, b"1a".sha3()))).unwrap(); + assert!(jdb.exists(&foo)); + } } From b2f69a08ca1ecc893f86b39ad5d6f4ded3be95c9 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 7 Feb 2016 11:50:56 +0100 Subject: [PATCH 04/12] Clarified counters --- util/src/journaldb.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 2173fdeb6..a2b0981cc 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -106,10 +106,12 @@ impl JournalDB { // we remove all of its removes assuming it is canonical and all // of its inserts otherwise. // - // we also keep track of the counters for each key inserted in the journal to handle the following cases: - // key K is removed in block A(N) and re-inserted in block B(N + C) (where C < H). K must not be deleted from the DB. - // key K is added in block A(N) and reverted in block B(N + C) (where C < H). K must be deleted - // key K is added in blocks A(N) and A'(N) and is reverted in block B(N + C ) (where C < H). K must not be deleted + // we also keep track of the counters for each key inserted in the journal to handle + // the following cases where key K must not be deleted from the DB: + // Given H is the journal size in eras, 0 <= C <= H. + // Key K is removed in era A(N) and re-inserted in canonical era B(N + C). + // Key K is removed in era A(N) and re-inserted in non-canonical era B`(N + C). + // Key K is added in canonical era A(N) and non-canonicnal B'(N + C). // record new commit's details. let batch = WriteBatch::new(); From 976b10a4efa3bb4c1d0c3e16714bcd1dc0071a81 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 7 Feb 2016 12:10:28 +0100 Subject: [PATCH 05/12] Removed warning supression --- util/src/journaldb.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index a2b0981cc..5d3276631 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -92,7 +92,6 @@ impl JournalDB { /// Commit all recent insert operations and historical removals from the old era /// to the backing database. - #[allow(cyclomatic_complexity)] pub fn commit(&mut self, now: u64, id: &H256, end: Option<(u64, H256)>) -> Result { // journal format: // [era, 0] => [ id, [insert_0, ...], [remove_0, ...] ] From 28dd73e340097615cc3f8b75f9fb73354c2bd397 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 7 Feb 2016 18:28:15 +0100 Subject: [PATCH 06/12] Updated counter comment --- util/src/journaldb.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 5d3276631..c13bdee62 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -110,7 +110,7 @@ impl JournalDB { // Given H is the journal size in eras, 0 <= C <= H. // Key K is removed in era A(N) and re-inserted in canonical era B(N + C). // Key K is removed in era A(N) and re-inserted in non-canonical era B`(N + C). - // Key K is added in canonical era A(N) and non-canonicnal B'(N + C). + // Key K is added in non-canonical era A'(N) canonical B(N + C). // record new commit's details. let batch = WriteBatch::new(); From 3f17acca1d89f3cfcab663f88886a317ac63527d Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Sun, 7 Feb 2016 23:01:09 +0300 Subject: [PATCH 07/12] empty new block test --- sync/src/chain.rs | 16 ++++++++++++++++ sync/src/tests/chain.rs | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index dc9caad9a..778da490c 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1304,4 +1304,20 @@ mod tests { // NEW_BLOCK_PACKET assert_eq!(0x07, io.queue[0].packet_id); } + + #[test] + fn handles_empty_peer_new_block() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(10, false); + let mut queue = VecDeque::new(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut io = TestIo::new(&mut client, &mut queue, None); + + let empty_data = vec![]; + let block = UntrustedRlp::new(&empty_data); + + let result = sync.on_peer_new_block(&mut io, 0, &block); + + assert!(result.is_err()); + } } \ No newline at end of file diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 34f94f7e2..6526d8500 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -138,7 +138,7 @@ fn propagade_hashes() { #[test] fn propagade_blocks() { - let mut net = TestNet::new(10); + let mut net = TestNet::new(2); net.peer_mut(1).chain.add_blocks(10, false); net.sync(); From 6c36a7e1a6258643ec8f5f00c1a9d127b9e96401 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 7 Feb 2016 21:18:51 +0100 Subject: [PATCH 08/12] Apply all removes after updating all counters --- util/src/journaldb.rs | 61 +++++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index c13bdee62..371e60c58 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -20,7 +20,7 @@ use common::*; use rlp::*; use hashdb::*; use memorydb::*; -use rocksdb::{DB, Writable, WriteBatch, IteratorMode, DBVector}; +use rocksdb::{DB, Writable, WriteBatch, IteratorMode}; #[cfg(test)] use std::env; @@ -105,12 +105,16 @@ impl JournalDB { // we remove all of its removes assuming it is canonical and all // of its inserts otherwise. // - // we also keep track of the counters for each key inserted in the journal to handle - // the following cases where key K must not be deleted from the DB: + // We also keep reference counters for each key inserted in the journal to handle + // the following cases where key K must not be deleted from the DB when processing removals : // Given H is the journal size in eras, 0 <= C <= H. // Key K is removed in era A(N) and re-inserted in canonical era B(N + C). // Key K is removed in era A(N) and re-inserted in non-canonical era B`(N + C). // Key K is added in non-canonical era A'(N) canonical B(N + C). + // + // The counter is encreased each time a key is inserted in the journal in the commit. The list of insertions + // is saved with the era record. When the era becomes end_era and goes out of journal the counter is decreased + // and the key is safe to delete. // record new commit's details. let batch = WriteBatch::new(); @@ -131,7 +135,7 @@ impl JournalDB { let mut r = RlpStream::new_list(3); let inserts: Vec = self.overlay.keys().iter().filter(|&(_, &c)| c > 0).map(|(key, _)| key.clone()).collect(); - // Increase counter for each insrted key no matter if the block is canonical or not. + // Increase counter for each inserted key no matter if the block is canonical or not. for i in &inserts { *counters.entry(i.clone()).or_insert(0) += 1; } @@ -146,7 +150,8 @@ impl JournalDB { if let Some((end_era, canon_id)) = end { let mut index = 0usize; let mut last; - let mut canon_data: Option = None; + let mut to_remove: Vec = Vec::new(); + let mut canon_inserts: Vec = Vec::new(); while let Some(rlp_data) = try!(self.backing.get({ let mut r = RlpStream::new_list(2); r.append(&end_era); @@ -154,30 +159,30 @@ impl JournalDB { last = r.drain(); &last })) { - let canon = { - let rlp = Rlp::new(&rlp_data); - if canon_id != rlp.val_at(0) { - let to_add: Vec = rlp.val_at(1); - JournalDB::apply_removes(&to_add, &to_add, &mut counters, &batch); - false - } else { true } - }; - if canon { - canon_data = Some(rlp_data) + let rlp = Rlp::new(&rlp_data); + let inserts: Vec = rlp.val_at(1); + JournalDB::decrease_counters(&inserts, &mut counters); + // Collect keys to be removed. These are removed keys for canonical block, inserted for non-canonical + if canon_id == rlp.val_at(0) { + to_remove.extend(rlp.at(2).iter().map(|r| r.as_val::())); + canon_inserts = inserts; + } + else { + to_remove.extend(inserts); } try!(batch.delete(&last)); index += 1; } - // Canon must be commited last to handle a case when counter reaches 0 in a sibling block - if let Some(ref c) = canon_data { - let rlp = Rlp::new(&c); - let deleted = JournalDB::apply_removes(&rlp.val_at::>(1), &rlp.val_at::>(2), &mut counters, &batch); - trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, deleted); - } + let canon_inserts = canon_inserts.drain(..).collect::>(); + // Purge removed keys if they are not referenced and not re-inserted in the canon commit + for h in to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)) { + try!(batch.delete(&h)); + } try!(batch.put(&LAST_ERA_KEY, &encode(&end_era))); } + // Commit overlay insertions let mut ret = 0u32; let mut deletes = 0usize; for i in self.overlay.drain().into_iter() { @@ -199,10 +204,10 @@ impl JournalDB { Ok(ret) } - fn apply_removes(added: &[H256], removed: &[H256], counters: &mut HashMap, batch: &WriteBatch) -> usize { - let mut deleted = 0usize; - // Decrease the counters first - for i in added.iter() { + + // Decrease counters for given keys. Deletes obsolete counters + fn decrease_counters(keys: &[H256], counters: &mut HashMap) { + for i in keys.iter() { let delete_counter = { if let Some(mut cnt) = counters.get_mut(i) { *cnt -= 1; @@ -214,12 +219,6 @@ impl JournalDB { counters.remove(i); } } - // Remove only if counter reached zero - for i in removed.iter().filter(|i| !counters.contains_key(i)) { - batch.delete(&i).expect("Low-level database error. Some issue with your hard disk?"); - deleted += 1; - } - deleted } fn payload(&self, key: &H256) -> Option { From 4b1d67ef49923cc67d21eebc9e0df38efb569020 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Mon, 8 Feb 2016 00:08:15 +0300 Subject: [PATCH 09/12] bunch of tests for new block packet --- sync/src/chain.rs | 72 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 66 insertions(+), 6 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 778da490c..31f03fd9a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -434,12 +434,11 @@ impl ChainSync { let block_rlp = try!(r.at(0)); let header_rlp = try!(block_rlp.at(0)); let h = header_rlp.as_raw().sha3(); - trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h); - let header_view = HeaderView::new(header_rlp.as_raw()); + let header: BlockHeader = try!(header_rlp.as_val()); let mut unknown = false; // TODO: Decompose block and add to self.headers and self.bodies instead - if header_view.number() == From::from(self.current_base_block() + 1) { + if header.number == From::from(self.current_base_block() + 1) { match io.chain().import_block(block_rlp.as_raw().to_vec()) { Err(ImportError::AlreadyInChain) => { trace!(target: "sync", "New block already in chain {:?}", h); @@ -472,7 +471,7 @@ impl ChainSync { trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); { let peer = self.peers.get_mut(&peer_id).expect("ChainSync: unknown peer"); - peer.latest = header_view.sha3(); + peer.latest = header.hash(); } self.sync_peer(io, peer_id, true); } @@ -1153,7 +1152,32 @@ mod tests { use super::*; use util::*; use super::{PeerInfo, PeerAsking}; - use ethcore::header::{BlockNumber}; + use ethcore::header::*; + use ethcore::client::*; + + fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes { + let mut header = Header::new(); + header.gas_limit = x!(0); + header.difficulty = x!(order * 100); + header.timestamp = (order * 10) as u64; + header.number = order as u64; + header.parent_hash = parent_hash; + header.state_root = H256::zero(); + + let mut rlp = RlpStream::new_list(3); + rlp.append(&header); + rlp.append_raw(&rlp::EMPTY_LIST_RLP, 1); + rlp.append_raw(&rlp::EMPTY_LIST_RLP, 1); + rlp.out() + } + + fn get_dummy_blocks(order: u32, parent_hash: H256) -> Bytes { + let mut rlp = RlpStream::new_list(1); + rlp.append_raw(&get_dummy_block(order, parent_hash), 1); + let difficulty: U256 = x!(100 * order); + rlp.append(&difficulty); + rlp.out() + } #[test] fn return_receipts_empty() { @@ -1306,7 +1330,43 @@ mod tests { } #[test] - fn handles_empty_peer_new_block() { + fn handles_peer_new_block_mallformed() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(10, false); + + let block_data = get_dummy_block(11, client.chain_info().best_block_hash); + + let mut queue = VecDeque::new(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut io = TestIo::new(&mut client, &mut queue, None); + + let block = UntrustedRlp::new(&block_data); + + let result = sync.on_peer_new_block(&mut io, 0, &block); + + assert!(result.is_err()); + } + + #[test] + fn handles_peer_new_block() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(10, false); + + let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash); + + let mut queue = VecDeque::new(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut io = TestIo::new(&mut client, &mut queue, None); + + let block = UntrustedRlp::new(&block_data); + + let result = sync.on_peer_new_block(&mut io, 0, &block); + + assert!(result.is_ok()); + } + + #[test] + fn handles_peer_new_block_empty() { let mut client = TestBlockChainClient::new(); client.add_blocks(10, false); let mut queue = VecDeque::new(); From 63f2f42035f0bef9b508e74fff73c6b1f3781c59 Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 7 Feb 2016 22:16:58 +0100 Subject: [PATCH 10/12] Restored trace --- util/src/journaldb.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/util/src/journaldb.rs b/util/src/journaldb.rs index 371e60c58..e805f0a60 100644 --- a/util/src/journaldb.rs +++ b/util/src/journaldb.rs @@ -176,10 +176,13 @@ impl JournalDB { let canon_inserts = canon_inserts.drain(..).collect::>(); // Purge removed keys if they are not referenced and not re-inserted in the canon commit + let mut deletes = 0; for h in to_remove.iter().filter(|h| !counters.contains_key(h) && !canon_inserts.contains(h)) { try!(batch.delete(&h)); + deletes += 1; } try!(batch.put(&LAST_ERA_KEY, &encode(&end_era))); + trace!("JournalDB: delete journal for time #{}.{}, (canon was {}): {} entries", end_era, index, canon_id, deletes); } // Commit overlay insertions From e9af2dfd9669548a15efdc1404574a0a4b38ae29 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Mon, 8 Feb 2016 00:20:59 +0300 Subject: [PATCH 11/12] new hashes tests --- sync/src/chain.rs | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 31f03fd9a..571e73226 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1179,6 +1179,21 @@ mod tests { rlp.out() } + fn get_dummy_hashes() -> Bytes { + let mut rlp = RlpStream::new_list(5); + for _ in 0..5 { + let mut hash_d_rlp = RlpStream::new_list(2); + let hash: H256 = H256::from(0u64); + let diff: U256 = U256::from(1u64); + hash_d_rlp.append(&hash); + hash_d_rlp.append(&diff); + + rlp.append_raw(&hash_d_rlp.out(), 1); + } + + rlp.out() + } + #[test] fn return_receipts_empty() { let mut client = TestBlockChainClient::new(); @@ -1380,4 +1395,36 @@ mod tests { assert!(result.is_err()); } + + #[test] + fn handles_peer_new_hashes() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(10, false); + let mut queue = VecDeque::new(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut io = TestIo::new(&mut client, &mut queue, None); + + let hashes_data = get_dummy_hashes(); + let hashes_rlp = UntrustedRlp::new(&hashes_data); + + let result = sync.on_peer_new_hashes(&mut io, 0, &hashes_rlp); + + assert!(result.is_ok()); + } + + #[test] + fn handles_peer_new_hashes_empty() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(10, false); + let mut queue = VecDeque::new(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut io = TestIo::new(&mut client, &mut queue, None); + + let empty_hashes_data = vec![]; + let hashes_rlp = UntrustedRlp::new(&empty_hashes_data); + + let result = sync.on_peer_new_hashes(&mut io, 0, &hashes_rlp); + + assert!(result.is_ok()); + } } \ No newline at end of file From 871b7113ecf59bf02758400dee8cbf542e2469e6 Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Mon, 8 Feb 2016 01:39:02 +0300 Subject: [PATCH 12/12] fixes for valid rlp --- sync/src/chain.rs | 49 +++++++++++++++++++++++++++++++++++---- sync/src/tests/helpers.rs | 2 +- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 571e73226..0c3c9f4bc 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -1056,7 +1056,15 @@ impl ChainSync { match route.blocks.len() { 0 => None, _ => { - Some(rlp::encode(&route.blocks).to_vec()) + let mut rlp_stream = RlpStream::new_list(route.blocks.len()); + for block_hash in route.blocks { + let mut hash_rlp = RlpStream::new_list(2); + let difficulty = chain.block_total_difficulty(&block_hash).expect("Mallformed block without a difficulty on the chain!"); + hash_rlp.append(&block_hash); + hash_rlp.append(&difficulty); + rlp_stream.append_raw(&hash_rlp.out(), 1); + } + Some(rlp_stream.out()) } } }, @@ -1065,7 +1073,10 @@ impl ChainSync { } fn create_latest_block_rlp(chain: &BlockChainClient) -> Bytes { - chain.block(&chain.chain_info().best_block_hash).expect("Creating latest block when there is none") + let mut rlp_stream = RlpStream::new_list(2); + rlp_stream.append_raw(&chain.block(&chain.chain_info().best_block_hash).expect("Creating latest block when there is none"), 1); + rlp_stream.append(&chain.chain_info().total_difficulty); + rlp_stream.out() } fn get_lagging_peers(&self, io: &SyncIo) -> Vec { @@ -1304,8 +1315,8 @@ mod tests { assert!(rlp.is_none()); let rlp = ChainSync::create_new_hashes_rlp(&client, &start, &end).unwrap(); - // size of three rlp encoded hash - assert_eq!(101, rlp.len()); + // size of three rlp encoded hash-difficulty + assert_eq!(107, rlp.len()); } #[test] @@ -1427,4 +1438,34 @@ mod tests { assert!(result.is_ok()); } + + #[test] + fn hashes_rlp_mutually_acceptable() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(100, false); + let mut queue = VecDeque::new(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut io = TestIo::new(&mut client, &mut queue, None); + + sync.propagade_new_hashes(&mut io); + + let data = &io.queue[0].data.clone(); + let result = sync.on_peer_new_hashes(&mut io, 0, &UntrustedRlp::new(&data)); + assert!(result.is_ok()); + } + + #[test] + fn block_rlp_mutually_acceptable() { + let mut client = TestBlockChainClient::new(); + client.add_blocks(100, false); + let mut queue = VecDeque::new(); + let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5)); + let mut io = TestIo::new(&mut client, &mut queue, None); + + sync.propagade_blocks(&mut io); + + let data = &io.queue[0].data.clone(); + let result = sync.on_peer_new_block(&mut io, 0, &UntrustedRlp::new(&data)); + assert!(result.is_ok()); + } } \ No newline at end of file diff --git a/sync/src/tests/helpers.rs b/sync/src/tests/helpers.rs index da82363dd..d155fee6b 100644 --- a/sync/src/tests/helpers.rs +++ b/sync/src/tests/helpers.rs @@ -79,7 +79,7 @@ impl TestBlockChainClient { impl BlockChainClient for TestBlockChainClient { fn block_total_difficulty(&self, _h: &H256) -> Option { - unimplemented!(); + Some(U256::zero()) } fn block_header(&self, h: &H256) -> Option {