New sync algorithm

This commit is contained in:
arkpar 2016-05-16 14:41:41 +02:00
parent dfac17538f
commit ca6c91f591
9 changed files with 487 additions and 824 deletions

6
Cargo.lock generated
View File

@ -332,7 +332,7 @@ dependencies = [
"mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"nix 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)",
"rocksdb 0.4.3",
"rocksdb 0.4.3 (git+https://github.com/ethcore/rust-rocksdb)",
"rust-crypto 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-serialize 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
@ -628,6 +628,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "librocksdb-sys"
version = "0.2.3"
source = "git+https://github.com/ethcore/rust-rocksdb#6b6ce93e2828182691e00da57fdfb2926226f1f1"
dependencies = [
"libc 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -969,9 +970,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "rocksdb"
version = "0.4.3"
source = "git+https://github.com/ethcore/rust-rocksdb#6b6ce93e2828182691e00da57fdfb2926226f1f1"
dependencies = [
"libc 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)",
"librocksdb-sys 0.2.3",
"librocksdb-sys 0.2.3 (git+https://github.com/ethcore/rust-rocksdb)",
]
[[package]]

View File

@ -191,11 +191,23 @@ impl TestBlockChainClient {
}
}
/// TODO:
/// Make a bad block by setting invalid extra data.
pub fn corrupt_block(&mut self, n: BlockNumber) {
let hash = self.block_hash(BlockID::Number(n)).unwrap();
let mut header: BlockHeader = decode(&self.block_header(BlockID::Number(n)).unwrap());
header.parent_hash = H256::new();
header.extra_data = b"This extra data is way too long to be considered valid".to_vec();
let mut rlp = RlpStream::new_list(3);
rlp.append(&header);
rlp.append_raw(&rlp::NULL_RLP, 1);
rlp.append_raw(&rlp::NULL_RLP, 1);
self.blocks.write().unwrap().insert(hash, rlp.out());
}
/// Make a bad block by setting invalid parent hash.
pub fn corrupt_block_parent(&mut self, n: BlockNumber) {
let hash = self.block_hash(BlockID::Number(n)).unwrap();
let mut header: BlockHeader = decode(&self.block_header(BlockID::Number(n)).unwrap());
header.parent_hash = H256::from(42);
let mut rlp = RlpStream::new_list(3);
rlp.append(&header);
rlp.append_raw(&rlp::NULL_RLP, 1);
@ -229,8 +241,8 @@ impl BlockChainClient for TestBlockChainClient {
Some(U256::zero())
}
fn block_hash(&self, _id: BlockID) -> Option<H256> {
unimplemented!();
fn block_hash(&self, id: BlockID) -> Option<H256> {
Self::block_hash(self, id)
}
fn nonce(&self, address: &Address) -> U256 {

View File

@ -233,8 +233,8 @@ impl<C, S, A, M, EM> Eth for EthClient<C, S, A, M, EM>
Params::None => {
let status = take_weak!(self.sync).status();
let res = match status.state {
SyncState::NotSynced | SyncState::Idle => SyncStatus::None,
SyncState::Waiting | SyncState::Blocks | SyncState::NewBlocks => SyncStatus::Info(SyncInfo {
SyncState::Idle => SyncStatus::None,
SyncState::Waiting | SyncState::Blocks | SyncState::NewBlocks | SyncState::ChainHead => SyncStatus::Info(SyncInfo {
starting_block: U256::from(status.start_block_number),
current_block: U256::from(take_weak!(self.client).chain_info().best_block_number),
highest_block: U256::from(status.highest_block_number.unwrap_or(status.start_block_number))

View File

@ -39,7 +39,7 @@ impl TestSyncProvider {
pub fn new(config: Config) -> Self {
TestSyncProvider {
status: RwLock::new(SyncStatus {
state: SyncState::NotSynced,
state: SyncState::Idle,
network_id: config.network_id,
protocol_version: 63,
start_block_number: 0,

File diff suppressed because it is too large Load Diff

View File

@ -74,8 +74,8 @@ use io::NetSyncIo;
use chain::ChainSync;
mod chain;
mod blocks;
mod io;
mod range_collection;
#[cfg(test)]
mod tests;
@ -116,9 +116,10 @@ pub use self::chain::{SyncStatus, SyncState};
impl EthSync {
/// Creates and register protocol with the network service
pub fn register(service: &mut NetworkService<SyncMessage>, config: SyncConfig, chain: Arc<Client>, miner: Arc<Miner>) -> Arc<EthSync> {
let sync = ChainSync::new(config, miner, chain.deref());
let sync = Arc::new(EthSync {
chain: chain,
sync: RwLock::new(ChainSync::new(config, miner)),
sync: RwLock::new(sync),
});
service.register_protocol(sync.clone(), "eth", &[62u8, 63u8]).expect("Error registering eth protocol handler");
sync

View File

@ -1,317 +0,0 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.
// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.
/// This module defines a trait for a collection of ranged values and an implementation
/// for this trait over sorted vector.
use std::ops::{Add, Sub, Range};
pub trait ToUsize {
fn to_usize(&self) -> usize;
}
pub trait FromUsize {
fn from_usize(s: usize) -> Self;
}
/// A key-value collection orderd by key with sequential key-value pairs grouped together.
/// Such group is called a range.
/// E.g. a set of collection of 5 pairs {1, a}, {2, b}, {10, x}, {11, y}, {12, z} will be grouped into two ranges: {1, [a,b]}, {10, [x,y,z]}
pub trait RangeCollection<K, V> {
/// Check if the given key is present in the collection.
fn have_item(&self, key: &K) -> bool;
/// Get value by key.
fn find_item(&self, key: &K) -> Option<&V>;
/// Get a range of keys from `key` till the end of the range that has `key`
/// Returns an empty range is key does not exist.
fn get_tail(&mut self, key: &K) -> Range<K>;
/// Remove all elements < `start` in the range that contains `start` - 1
fn remove_head(&mut self, start: &K);
/// Remove all elements >= `start` in the range that contains `start`
fn remove_tail(&mut self, start: &K);
/// Remove all elements >= `start`
fn remove_from(&mut self, start: &K);
/// Remove all elements >= `tail`
fn insert_item(&mut self, key: K, value: V);
/// Get an iterator over ranges
fn range_iter(& self) -> RangeIterator<K, V>;
}
/// Range iterator. For each range yelds a key for the first element of the range and a vector of values.
pub struct RangeIterator<'c, K:'c, V:'c> {
range: usize,
collection: &'c Vec<(K, Vec<V>)>
}
impl<'c, K:'c, V:'c> Iterator for RangeIterator<'c, K, V> where K: Add<Output = K> + FromUsize + ToUsize + Copy {
type Item = (K, &'c [V]);
// The 'Iterator' trait only requires the 'next' method to be defined. The
// return type is 'Option<T>', 'None' is returned when the 'Iterator' is
// over, otherwise the next value is returned wrapped in 'Some'
fn next(&mut self) -> Option<(K, &'c [V])> {
if self.range > 0 {
self.range -= 1;
}
else {
return None;
}
match self.collection.get(self.range) {
Some(&(ref k, ref vec)) => {
Some((*k, vec))
},
None => None
}
}
}
impl<K, V> RangeCollection<K, V> for Vec<(K, Vec<V>)> where K: Ord + PartialEq + Add<Output = K> + Sub<Output = K> + Copy + FromUsize + ToUsize {
fn range_iter(&self) -> RangeIterator<K, V> {
RangeIterator {
range: self.len(),
collection: self
}
}
fn have_item(&self, key: &K) -> bool {
match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) {
Ok(_) => true,
Err(index) => match self.get(index) {
Some(&(ref k, ref v)) => k <= key && (*k + FromUsize::from_usize(v.len())) > *key,
_ => false
},
}
}
fn find_item(&self, key: &K) -> Option<&V> {
match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) {
Ok(index) => self.get(index).unwrap().1.get(0),
Err(index) => match self.get(index) {
Some(&(ref k, ref v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => v.get((*key - *k).to_usize()),
_ => None
},
}
}
fn get_tail(&mut self, key: &K) -> Range<K> {
let kv = *key;
match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) {
Ok(index) => kv..(kv + FromUsize::from_usize(self[index].1.len())),
Err(index) => {
match self.get_mut(index) {
Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => {
kv..(*k + FromUsize::from_usize(v.len()))
}
_ => kv..kv
}
},
}
}
/// Remove element key and following elements in the same range
fn remove_tail(&mut self, key: &K) {
match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) {
Ok(index) => { self.remove(index); },
Err(index) =>{
let mut empty = false;
match self.get_mut(index) {
Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => {
v.truncate((*key - *k).to_usize());
empty = v.is_empty();
}
_ => {}
}
if empty {
self.remove(index);
}
},
}
}
/// Remove the element and all following it.
fn remove_from(&mut self, key: &K) {
match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) {
Ok(index) => { self.drain(.. index + 1); },
Err(index) =>{
let mut empty = false;
match self.get_mut(index) {
Some(&mut (ref k, ref mut v)) if k <= key && (*k + FromUsize::from_usize(v.len())) > *key => {
v.truncate((*key - *k).to_usize());
empty = v.is_empty();
}
_ => {}
}
if empty {
self.drain(.. index + 1);
} else {
self.drain(.. index);
}
},
}
}
/// Remove range elements up to key
fn remove_head(&mut self, key: &K) {
if *key == FromUsize::from_usize(0) {
return
}
let prev = *key - FromUsize::from_usize(1);
match self.binary_search_by(|&(k, _)| k.cmp(key).reverse()) {
Ok(_) => { }, //start of range, do nothing.
Err(index) => {
let mut empty = false;
match self.get_mut(index) {
Some(&mut (ref mut k, ref mut v)) if *k <= prev && (*k + FromUsize::from_usize(v.len())) > prev => {
let tail = v.split_off((*key - *k).to_usize());
empty = tail.is_empty();
let removed = ::std::mem::replace(v, tail);
let new_k = *k + FromUsize::from_usize(removed.len());
::std::mem::replace(k, new_k);
}
_ => {}
}
if empty {
self.remove(index);
}
},
}
}
fn insert_item(&mut self, key: K, value: V) {
assert!(!self.have_item(&key));
// todo: fix warning
let lower = match self.binary_search_by(|&(k, _)| k.cmp(&key).reverse()) {
Ok(index) | Err(index) => index
};
let mut to_remove: Option<usize> = None;
if lower < self.len() && self[lower].0 + FromUsize::from_usize(self[lower].1.len()) == key {
// extend into existing chunk
self[lower].1.push(value);
}
else {
// insert a new chunk
let range: Vec<V> = vec![value];
self.insert(lower, (key, range));
};
if lower > 0 {
let next = lower - 1;
if next < self.len()
{
{
let (mut next, mut inserted) = self.split_at_mut(lower);
let mut next = next.last_mut().unwrap();
let mut inserted = inserted.first_mut().unwrap();
if next.0 == key + FromUsize::from_usize(1)
{
inserted.1.append(&mut next.1);
to_remove = Some(lower - 1);
}
}
if let Some(r) = to_remove {
self.remove(r);
}
}
}
}
}
#[test]
#[cfg_attr(feature="dev", allow(cyclomatic_complexity))]
fn test_range() {
use std::cmp::{Ordering};
let mut ranges: Vec<(u64, Vec<char>)> = Vec::new();
assert_eq!(ranges.range_iter().next(), None);
assert_eq!(ranges.find_item(&1), None);
assert!(!ranges.have_item(&1));
assert_eq!(ranges.get_tail(&0), 0..0);
ranges.insert_item(17, 'q');
assert_eq!(ranges.range_iter().cmp(vec![(17, &['q'][..])]), Ordering::Equal);
assert_eq!(ranges.find_item(&17), Some(&'q'));
assert!(ranges.have_item(&17));
assert_eq!(ranges.get_tail(&17), 17..18);
ranges.insert_item(18, 'r');
assert_eq!(ranges.range_iter().cmp(vec![(17, &['q', 'r'][..])]), Ordering::Equal);
assert_eq!(ranges.find_item(&18), Some(&'r'));
assert!(ranges.have_item(&18));
assert_eq!(ranges.get_tail(&17), 17..19);
ranges.insert_item(16, 'p');
assert_eq!(ranges.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal);
assert_eq!(ranges.find_item(&16), Some(&'p'));
assert_eq!(ranges.find_item(&17), Some(&'q'));
assert_eq!(ranges.find_item(&18), Some(&'r'));
assert!(ranges.have_item(&16));
assert_eq!(ranges.get_tail(&17), 17..19);
assert_eq!(ranges.get_tail(&16), 16..19);
ranges.insert_item(2, 'b');
assert_eq!(ranges.range_iter().cmp(vec![(2, &['b'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
assert_eq!(ranges.find_item(&2), Some(&'b'));
ranges.insert_item(3, 'c');
ranges.insert_item(4, 'd');
assert_eq!(ranges.get_tail(&3), 3..5);
assert_eq!(ranges.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
let mut r = ranges.clone();
r.remove_head(&1);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
r.remove_head(&2);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
r.remove_head(&3);
assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
r.remove_head(&10);
assert_eq!(r.range_iter().cmp(vec![(3, &['c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
r.remove_head(&5);
assert_eq!(r.range_iter().cmp(vec![(16, &['p', 'q', 'r'][..])]), Ordering::Equal);
r.remove_head(&19);
assert_eq!(r.range_iter().next(), None);
let mut r = ranges.clone();
r.remove_tail(&20);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
r.remove_tail(&17);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal);
r.remove_tail(&16);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal);
r.remove_tail(&3);
assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal);
r.remove_tail(&2);
assert_eq!(r.range_iter().next(), None);
let mut r = ranges.clone();
r.remove_from(&20);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
r.remove_from(&18);
assert!(!r.have_item(&18));
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q'][..])]), Ordering::Equal);
r.remove_from(&16);
assert!(!r.have_item(&16));
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal);
r.remove_from(&3);
assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal);
r.remove_from(&1);
assert_eq!(r.range_iter().next(), None);
let mut r = ranges.clone();
r.remove_from(&2);
assert_eq!(r.range_iter().next(), None);
}

View File

@ -30,6 +30,16 @@ fn two_peers() {
assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref());
}
#[test]
fn long_chain() {
::env_logger::init().ok();
let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(50000, EachBlockWith::Nothing);
net.sync();
assert!(net.peer(0).chain.block(BlockID::Number(50000)).is_some());
assert_eq!(net.peer(0).chain.blocks.read().unwrap().deref(), net.peer(1).chain.blocks.read().unwrap().deref());
}
#[test]
fn status_after_sync() {
::env_logger::init().ok();
@ -47,7 +57,7 @@ fn takes_few_steps() {
net.peer_mut(1).chain.add_blocks(100, EachBlockWith::Uncle);
net.peer_mut(2).chain.add_blocks(100, EachBlockWith::Uncle);
let total_steps = net.sync();
assert!(total_steps < 7);
assert!(total_steps < 20);
}
#[test]
@ -79,6 +89,7 @@ fn forked() {
// peer 1 has the best chain of 601 blocks
let peer1_chain = net.peer(1).chain.numbers.read().unwrap().clone();
net.sync();
assert_eq!(net.peer(0).chain.difficulty.read().unwrap().deref(), net.peer(1).chain.difficulty.read().unwrap().deref());
assert_eq!(net.peer(0).chain.numbers.read().unwrap().deref(), &peer1_chain);
assert_eq!(net.peer(1).chain.numbers.read().unwrap().deref(), &peer1_chain);
assert_eq!(net.peer(2).chain.numbers.read().unwrap().deref(), &peer1_chain);
@ -97,13 +108,13 @@ fn restart() {
net.restart_peer(0);
let status = net.peer(0).sync.status();
assert_eq!(status.state, SyncState::NotSynced);
assert_eq!(status.state, SyncState::ChainHead);
}
#[test]
fn status_empty() {
let net = TestNet::new(2);
assert_eq!(net.peer(0).sync.status().state, SyncState::NotSynced);
assert_eq!(net.peer(0).sync.status().state, SyncState::Idle);
}
#[test]
@ -166,8 +177,17 @@ fn restart_on_malformed_block() {
let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer_mut(1).chain.corrupt_block(6);
net.sync_steps(10);
net.sync_steps(20);
assert_eq!(net.peer(0).chain.chain_info().best_block_number, 4);
assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5);
}
#[test]
fn restart_on_broken_chain() {
let mut net = TestNet::new(2);
net.peer_mut(1).chain.add_blocks(10, EachBlockWith::Uncle);
net.peer_mut(1).chain.corrupt_block_parent(6);
net.sync_steps(20);
assert_eq!(net.peer(0).chain.chain_info().best_block_number, 5);
}

View File

@ -92,9 +92,11 @@ impl TestNet {
started: false,
};
for _ in 0..n {
let chain = TestBlockChainClient::new();
let sync = ChainSync::new(SyncConfig::default(), Miner::new(false, Spec::new_test()), &chain);
net.peers.push(TestPeer {
chain: TestBlockChainClient::new(),
sync: ChainSync::new(SyncConfig::default(), Miner::new(false, Spec::new_test())),
sync: sync,
chain: chain,
queue: VecDeque::new(),
});
}