This commit is contained in:
arkpar 2016-01-08 16:26:00 +01:00
parent 290d738e3f
commit 9ea7f14542
3 changed files with 27 additions and 23 deletions

View File

@ -81,15 +81,14 @@ pub struct SyncStatus {
pub state: SyncState, pub state: SyncState,
pub protocol_version: u8, pub protocol_version: u8,
pub start_block_number: BlockNumber, pub start_block_number: BlockNumber,
pub current_block_number: BlockNumber, pub last_imported_block_number: BlockNumber,
pub highest_block_number: BlockNumber, pub highest_block_number: BlockNumber,
pub blocks_total: usize, pub blocks_total: usize,
pub blocks_received: usize pub blocks_received: usize,
} }
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
enum PeerAsking enum PeerAsking {
{
Nothing, Nothing,
BlockHeaders, BlockHeaders,
BlockBodies, BlockBodies,
@ -102,7 +101,7 @@ struct PeerInfo {
latest: H256, latest: H256,
difficulty: U256, difficulty: U256,
asking: PeerAsking, asking: PeerAsking,
asking_blocks: Vec<BlockNumber> asking_blocks: Vec<BlockNumber>,
} }
type Body = Bytes; type Body = Bytes;
@ -119,9 +118,9 @@ pub struct ChainSync {
/// Set of block body numbers being downloaded /// Set of block body numbers being downloaded
downloading_bodies: HashSet<BlockNumber>, downloading_bodies: HashSet<BlockNumber>,
/// Downloaded headers. /// Downloaded headers.
headers: Vec<(BlockNumber, Vec<Header>)>, //TODO: use BTreeMap once range API is sable. For now it a vector sorted in descending order headers: Vec<(BlockNumber, Vec<Header>)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order
/// Downloaded bodies /// Downloaded bodies
bodies: Vec<(BlockNumber, Vec<Body>)>, //TODO: use BTreeMap once range API is sable. For now it a vector sorted in descending order bodies: Vec<(BlockNumber, Vec<Body>)>, //TODO: use BTreeMap once range API is sable. For now it is a vector sorted in descending order
/// Peer info /// Peer info
peers: HashMap<PeerId, PeerInfo>, peers: HashMap<PeerId, PeerInfo>,
/// Used to map body to header /// Used to map body to header
@ -152,7 +151,7 @@ impl ChainSync {
last_imported_block: 0, last_imported_block: 0,
last_imported_hash: H256::new(), last_imported_hash: H256::new(),
syncing_difficulty: U256::from(0u64), syncing_difficulty: U256::from(0u64),
have_common_block: false have_common_block: false,
} }
} }
@ -162,10 +161,10 @@ impl ChainSync {
state: self.state.clone(), state: self.state.clone(),
protocol_version: 63, protocol_version: 63,
start_block_number: self.starting_block, start_block_number: self.starting_block,
current_block_number: 0, //TODO: last_imported_block_number: self.last_imported_block,
highest_block_number: self.highest_block, highest_block_number: self.highest_block,
blocks_total: (self.last_imported_block - self.starting_block) as usize, blocks_total: (self.last_imported_block - self.starting_block) as usize,
blocks_received: (self.highest_block - self.starting_block) as usize blocks_received: (self.highest_block - self.starting_block) as usize,
} }
} }
@ -228,16 +227,16 @@ impl ChainSync {
} }
/// Called by peer once it has new block headers during sync /// Called by peer once it has new block headers during sync
fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { fn on_peer_block_headers(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) {
self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders); self.reset_peer_asking(peer_id, PeerAsking::BlockHeaders);
let item_count = r.item_count(); let item_count = r.item_count();
trace!(target: "sync", "{}-> BlockHeaders ({} entries)", peer_id, item_count); trace!(target: "sync", "{} -> BlockHeaders ({} entries)", peer_id, item_count);
self.clear_peer_download(peer_id); self.clear_peer_download(peer_id);
if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting {
trace!(target: "sync", "Ignored unexpected block headers"); trace!(target: "sync", "Ignored unexpected block headers");
return; return;
} }
if self.state == SyncState::Waiting { if self.state == SyncState::Waiting {
trace!(target: "sync", "Ignored block headers while waiting"); trace!(target: "sync", "Ignored block headers while waiting");
return; return;
} }
@ -313,7 +312,7 @@ impl ChainSync {
fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) { fn on_peer_block_bodies(&mut self, io: &mut SyncIo, peer_id: &PeerId, r: &Rlp) {
self.reset_peer_asking(peer_id, PeerAsking::BlockBodies); self.reset_peer_asking(peer_id, PeerAsking::BlockBodies);
let item_count = r.item_count(); let item_count = r.item_count();
trace!(target: "sync", "{}-> BlockBodies ({} entries)", peer_id, item_count); trace!(target: "sync", "{} -> BlockBodies ({} entries)", peer_id, item_count);
self.clear_peer_download(peer_id); self.clear_peer_download(peer_id);
if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting { if self.state != SyncState::Blocks && self.state != SyncState::NewBlocks && self.state != SyncState::Waiting {
trace!(target: "sync", "Ignored unexpected block bodies"); trace!(target: "sync", "Ignored unexpected block bodies");
@ -353,7 +352,7 @@ impl ChainSync {
let header_rlp = block_rlp.at(0); let header_rlp = block_rlp.at(0);
let h = header_rlp.as_raw().sha3(); let h = header_rlp.as_raw().sha3();
trace!(target: "sync", "{}-> NewBlock ({})", peer_id, h); trace!(target: "sync", "{} -> NewBlock ({})", peer_id, h);
match io.chain().import_block(block_rlp.as_raw()) { match io.chain().import_block(block_rlp.as_raw()) {
ImportResult::AlreadyInChain => { ImportResult::AlreadyInChain => {
trace!(target: "sync", "New block already in chain {:?}", h); trace!(target: "sync", "New block already in chain {:?}", h);
@ -386,7 +385,7 @@ impl ChainSync {
trace!(target: "sync", "Ignoring new hashes since we're already downloading."); trace!(target: "sync", "Ignoring new hashes since we're already downloading.");
return; return;
} }
trace!(target: "sync", "{}-> NewHashes ({} entries)", peer_id, r.item_count()); trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count());
let hashes = r.iter().map(|item| (item.val_at::<H256>(0), item.val_at::<U256>(1))); let hashes = r.iter().map(|item| (item.val_at::<H256>(0), item.val_at::<U256>(1)));
let mut max_height: U256 = From::from(0); let mut max_height: U256 = From::from(0);
for (h, d) in hashes { for (h, d) in hashes {
@ -491,7 +490,7 @@ impl ChainSync {
if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.last_imported_block + 1 { if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.last_imported_block + 1 {
for (start, ref items) in self.headers.range_iter() { for (start, ref items) in self.headers.range_iter() {
if needed_bodies.len() > MAX_BODIES_TO_REQUEST { if needed_bodies.len() > MAX_BODIES_TO_REQUEST {
break; break;
} }
let mut index: BlockNumber = 0; let mut index: BlockNumber = 0;
@ -732,7 +731,7 @@ impl ChainSync {
warn!(target:"sync", "Error sending status request: {:?}", e); warn!(target:"sync", "Error sending status request: {:?}", e);
io.disable_peer(peer_id); io.disable_peer(peer_id);
} }
Ok(_) => { } Ok(_) => ()
} }
} }
@ -765,6 +764,7 @@ impl ChainSync {
let max_count = min(MAX_HEADERS_TO_SEND, max_headers); let max_count = min(MAX_HEADERS_TO_SEND, max_headers);
let mut count = 0; let mut count = 0;
let mut data = Bytes::new(); let mut data = Bytes::new();
let inc = (skip + 1) as BlockNumber;
while number <= last && number > 0 && count < max_count { while number <= last && number > 0 && count < max_count {
match io.chain().block_header_at(number) { match io.chain().block_header_at(number) {
Some(mut hdr) => { Some(mut hdr) => {
@ -774,10 +774,13 @@ impl ChainSync {
None => {} None => {}
} }
if reverse { if reverse {
number -= (skip + 1) as BlockNumber; if number <= inc {
break;
}
number -= inc;
} }
else { else {
number += (skip + 1) as BlockNumber; number += inc;
} }
} }
let mut rlp = RlpStream::new_list(count as usize); let mut rlp = RlpStream::new_list(count as usize);

View File

@ -9,7 +9,7 @@ mod range_collection;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
pub fn new(_service: &mut NetworkService, eth_client: Arc<BlockChainClient+Send+Sized>) -> EthSync { pub fn new(_service: &mut NetworkService, eth_client: Arc<BlockChainClient + Send + Sized>) -> EthSync {
EthSync { EthSync {
chain: eth_client, chain: eth_client,
sync: ChainSync::new(), sync: ChainSync::new(),
@ -56,14 +56,14 @@ impl<'s, 'h> SyncIo for NetSyncIo<'s, 'h> {
} }
pub struct EthSync { pub struct EthSync {
chain: Arc<BlockChainClient+Send+Sized>, chain: Arc<BlockChainClient + Send + Sized>,
sync: ChainSync sync: ChainSync
} }
pub use self::chain::SyncStatus; pub use self::chain::SyncStatus;
impl EthSync { impl EthSync {
pub fn new(chain: Arc<BlockChainClient+Send+Sized>) -> EthSync { pub fn new(chain: Arc<BlockChainClient + Send + Sized>) -> EthSync {
EthSync { EthSync {
chain: chain, chain: chain,
sync: ChainSync::new(), sync: ChainSync::new(),

View File

@ -205,6 +205,7 @@ fn test_range() {
assert_eq!(ranges.find_item(&18), Some(&'r')); assert_eq!(ranges.find_item(&18), Some(&'r'));
assert!(ranges.have_item(&16)); assert!(ranges.have_item(&16));
assert_eq!(ranges.get_tail(&17), 17..19); assert_eq!(ranges.get_tail(&17), 17..19);
assert_eq!(ranges.get_tail(&16), 16..19);
ranges.insert_item(2, 'b'); ranges.insert_item(2, 'b');
assert_eq!(ranges.range_iter().cmp(vec![(2, &['b'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal); assert_eq!(ranges.range_iter().cmp(vec![(2, &['b'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);